X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2dms%2Fviews%2Fdms_lcm_view.py;h=ab84fbaefecd839b284db60aee20f461e73b001c;hb=c3ab2dcb442a0e66b7a621c59c6c7a679c26cee9;hp=32f77cc62d386b6eefcce9aff3ee762224c8525e;hpb=66318dfadc702f79ee8bc2a7c2289803b1b01033;p=pti%2Fo2.git diff --git a/o2dms/views/dms_lcm_view.py b/o2dms/views/dms_lcm_view.py index 32f77cc..ab84fba 100644 --- a/o2dms/views/dms_lcm_view.py +++ b/o2dms/views/dms_lcm_view.py @@ -13,10 +13,12 @@ # limitations under the License. from sqlalchemy import select - +import uuid from o2common.service import unit_of_work from o2ims.adapter.orm import deploymentmanager from o2dms.adapter.orm import nfDeploymentDesc +from o2dms.views.dms_dto import DmsLcmNfDeploymentDescriptorDTO +from o2dms.domain.dms import NfDeploymentDesc def deployment_managers(uow: unit_of_work.AbstractUnitOfWork): @@ -43,29 +45,49 @@ def lcm_nfdeploymentdesc_list(deploymentManagerID: str, def lcm_nfdeploymentdesc_one(nfdeploymentdescriptorid: str, - deploymentManagerID: str, uow: unit_of_work.AbstractUnitOfWork): with uow: - res = uow.session.execute(select(deploymentmanager).where( - nfDeploymentDesc.c.deploymentManagerId == deploymentManagerID, + res = uow.session.execute(select(nfDeploymentDesc).where( nfDeploymentDesc.c.id == nfdeploymentdescriptorid)) first = res.first() return None if first is None else dict(first) -# def lcm_nfdeploymentdesc_create(nfdeploymentdescriptorid: str, -# uow: unit_of_work.AbstractUnitOfWork): -# with uow: -# res = uow.session.execute(select(deploymentmanager).where( -# deploymentmanager.c.id == nfdeploymentdescriptorid)) -# first = res.first() -# return None if first is None else dict(first) +def lcm_nfdeploymentdesc_create( + deploymentManagerId: str, + input: DmsLcmNfDeploymentDescriptorDTO. + NfDeploymentDescriptor_create, + uow: unit_of_work.AbstractUnitOfWork): + + id = str(uuid.uuid4()) + entity = NfDeploymentDesc( + id, input['name'], deploymentManagerId, input['description'], + input['inputParams'], input['outputParams']) + with uow: + uow.nfdeployment_descs.add(entity) + uow.commit() + return id + +def lcm_nfdeploymentdesc_update( + nfdeploymentdescriptorid: str, + input: DmsLcmNfDeploymentDescriptorDTO.NfDeploymentDescriptor_update, + uow: unit_of_work.AbstractUnitOfWork): -# def lcm_nfdeploymentdesc_delete(nfdeploymentdescriptorid: str, -# uow: unit_of_work.AbstractUnitOfWork): -# with uow: -# res = uow.session.execute(select(deploymentmanager).where( -# deploymentmanager.c.id == nfdeploymentdescriptorid)) -# first = res.first() -# return None if first is None else dict(first) + with uow: + entity = uow.nfdeployment_descs.get(nfdeploymentdescriptorid) + entity.name = input['name'] + entity.description = input['description'] + entity.inputParams = input['inputParams'] + entity.outputParams = input['outputParams'] + uow.commit() + return True + + +def lcm_nfdeploymentdesc_delete( + nfdeploymentdescriptorid: str, uow: unit_of_work.AbstractUnitOfWork): + + with uow: + uow.nfdeployment_descs.delete(nfdeploymentdescriptorid) + uow.commit() + return True