X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=inline;f=o2dms%2Fviews%2Fdms_lcm_view.py;fp=o2dms%2Fviews%2Fdms_lcm_view.py;h=186c9c343c1d82aee3a0f3f6e0d75bce5a7d8fc9;hb=7fb3d02a3744ecb368b0531f16175034b987d42f;hp=ab84fbaefecd839b284db60aee20f461e73b001c;hpb=c3ab2dcb442a0e66b7a621c59c6c7a679c26cee9;p=pti%2Fo2.git diff --git a/o2dms/views/dms_lcm_view.py b/o2dms/views/dms_lcm_view.py index ab84fba..186c9c3 100644 --- a/o2dms/views/dms_lcm_view.py +++ b/o2dms/views/dms_lcm_view.py @@ -16,9 +16,10 @@ from sqlalchemy import select import uuid from o2common.service import unit_of_work from o2ims.adapter.orm import deploymentmanager -from o2dms.adapter.orm import nfDeploymentDesc +from o2dms.adapter.orm import nfDeploymentDesc, nfDeployment, nfOCloudVResource from o2dms.views.dms_dto import DmsLcmNfDeploymentDescriptorDTO -from o2dms.domain.dms import NfDeploymentDesc +from o2dms.views.dms_dto import DmsLcmNfDeploymentDTO +from o2dms.domain.dms import NfDeploymentDesc, NfDeployment def deployment_managers(uow: unit_of_work.AbstractUnitOfWork): @@ -91,3 +92,77 @@ def lcm_nfdeploymentdesc_delete( uow.nfdeployment_descs.delete(nfdeploymentdescriptorid) uow.commit() return True + + +def lcm_nfdeployment_list( + deploymentManagerID: str, uow: unit_of_work.AbstractUnitOfWork): + with uow: + res = uow.session.execute(select(nfDeployment).where( + nfDeployment.c.deploymentManagerId == deploymentManagerID)) + return [dict(r) for r in res] + + +def lcm_nfdeployment_one( + nfdeploymentid: str, uow: unit_of_work.AbstractUnitOfWork): + with uow: + res = uow.session.execute(select(nfDeployment).where( + nfDeployment.c.id == nfdeploymentid)) + first = res.first() + return None if first is None else dict(first) + + +def lcm_nfdeployment_create( + deploymentManagerId: str, + input: DmsLcmNfDeploymentDTO. + NfDeployment_create, + uow: unit_of_work.AbstractUnitOfWork): + + id = str(uuid.uuid4()) + entity = NfDeployment( + id, input['name'], deploymentManagerId, input['description'], + input['descriptorId'], input['parentDeploymentId']) + with uow: + uow.nfdeployments.add(entity) + uow.commit() + return id + + +def lcm_nfdeployment_update( + nfdeploymentdescriptorid: str, + input: DmsLcmNfDeploymentDTO.NfDeployment_update, + uow: unit_of_work.AbstractUnitOfWork): + + with uow: + entity = uow.nfdeployments.get(nfdeploymentdescriptorid) + entity.name = input['name'] + entity.description = input['description'] + entity.inputParams = input['descriptorId'] + entity.outputParams = input['parentDeploymentId'] + uow.commit() + return True + + +def lcm_nfdeployment_delete( + nfdeploymentdescriptorid: str, uow: unit_of_work.AbstractUnitOfWork): + + with uow: + uow.nfdeployments.delete(nfdeploymentdescriptorid) + uow.commit() + return True + + +def lcm_nfocloudvresource_list( + nfDeploymentId: str, uow: unit_of_work.AbstractUnitOfWork): + with uow: + res = uow.session.execute(select(nfOCloudVResource).where( + nfOCloudVResource.c.nfDeploymentId == nfDeploymentId)) + return [dict(r) for r in res] + + +def lcm_nfocloudvresource_one( + ocloudvresourceId: str, uow: unit_of_work.AbstractUnitOfWork): + with uow: + res = uow.session.execute(select(nfOCloudVResource).where( + nfOCloudVResource.c.id == ocloudvresourceId)) + first = res.first() + return None if first is None else dict(first)