from o2app import bootstrap
from o2ims.views import configure_namespace as ims_route_configure_namespace
+from o2common.views.route_exception import configure_exception
from o2ims.adapter.clients.alarm_dict_client import load_alarm_definition
from o2common.authmw import authmiddleware
app.config.SWAGGER_UI_DOC_EXPANSION = 'list'
# app.config['RESTX_MASK_HEADER'] = 'fields'
app.config['RESTX_MASK_SWAGGER'] = False
+app.config['ERROR_INCLUDE_MESSAGE'] = False
api = Api(app, version=FLASK_API_VERSION,
title='INF O2 Services API',
description='Swagger OpenAPI document for the INF O2 Services',
)
bus = bootstrap.bootstrap()
+configure_exception(api)
ims_route_configure_namespace(api)
load_alarm_definition(bus.uow)
from flask_restx.model import Model
from flask_restx.fields import List, Nested, String
from flask_restx.utils import unpack
-from o2common.domain.base import Serializer
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
all_fields_without_space = kwargs['all_fields'].replace(" ", "")
all_fields = all_fields_without_space.lower()
if 'true' == all_fields:
- mask_val = ''
+ selector = self.__gen_selector_from_model_with_value(
+ self.fields)
+ mask_val = self.__gen_mask_from_selector(selector)
elif 'fields' in kwargs and kwargs['fields'] != '':
fields_without_space = kwargs['fields'].replace(" ", "")
selector = {}
self.__update_selector_value(selector, fields_without_space, True)
+ self.__set_default_mask(selector)
mask_val = self.__gen_mask_from_selector(selector)
self.__update_selector_value(
selector, exclude_fields_without_space, False)
+ self.__set_default_mask(selector)
mask_val = self.__gen_mask_from_selector(selector)
elif 'exclude_default' in kwargs and kwargs['exclude_default'] != '':
selector[i] = default_val
return selector
- def __update_selector_value(self, default_selector: dict, filter: str,
+ def __update_selector_value(self, selector: dict, filter: str,
val: bool):
fields = filter.split(',')
for f in fields:
if '/' in f:
- self.__update_selector_tree_value(default_selector, f, val)
+ self.__update_selector_tree_value(selector, f, val)
continue
- default_selector[f] = val
+ selector[f] = val
def __update_selector_tree_value(self, m: dict, filter: str, val: bool):
filter_list = filter.split('/', 1)
return '{%s}' % ','.join(mask_li)
-
-class ProblemDetails(Serializer):
- def __init__(self, namespace: O2Namespace, code: int, detail: str,
- title=None, instance=None
- ) -> None:
- self.ns = namespace
- self.status = code
- self.detail = detail
- self.type = request.path
- self.title = title if title is not None else self.getTitle(code)
- self.instance = instance if instance is not None else []
-
- def getTitle(self, code):
- return HTTPStatus(code).phrase
-
- def abort(self):
- self.ns.abort(self.status, self.detail, **self.serialize())
-
- def serialize(self):
- details = {}
- for key in dir(self):
- if key == 'ns' or key.startswith('__') or\
- callable(getattr(self, key)):
- continue
- else:
- details[key] = getattr(self, key)
- return details
+ def __set_default_mask(self, selector: dict, val: bool = True):
+ default_selector = str(getattr(self.fields, "__mask__"))[1:-1]
+ self.__update_selector_value(selector, default_selector, val)
--- /dev/null
+# Copyright (C) 2021-2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from flask import request
+from flask_restx._http import HTTPStatus
+from werkzeug.exceptions import (
+ BadRequest,
+ MethodNotAllowed,
+ NotFound,
+ InternalServerError,
+)
+
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class BadRequestException(BadRequest):
+ def __init__(self, desc=None, resp=None):
+ super().__init__(description=desc, response=resp)
+
+
+class NotFoundException(NotFound):
+ def __init__(self, desc=None, resp=None):
+ super().__init__(description=desc, response=resp)
+
+
+class ProblemDetails():
+ def __init__(self, code: int, detail: str,
+ title=None, instance=None
+ ) -> None:
+ self.status = code
+ self.detail = detail
+ self.type = request.path
+ self.title = title if title is not None else self.getTitle(code)
+ self.instance = instance if instance is not None else []
+
+ def getTitle(self, code):
+ return HTTPStatus(code).phrase
+
+ def serialize(self):
+ details = {}
+ for key in dir(self):
+ if key == 'ns' or key.startswith('__') or \
+ callable(getattr(self, key)):
+ continue
+ else:
+ details[key] = getattr(self, key)
+ return details
+
+
+def configure_exception(app):
+
+ @app.errorhandler(BadRequestException)
+ def handle_badrequest_exception(error):
+ '''Return a custom message and 400 status code'''
+ type(error)
+ problem = ProblemDetails(400, str(error))
+ return problem.serialize(), 400
+
+ @app.errorhandler(NotFoundException)
+ def handle_notfound_exception(error):
+ '''Return a custom message and 404 status code'''
+ problem = ProblemDetails(404, str(error))
+ return problem.serialize(), 404
+
+ @app.errorhandler(MethodNotAllowed)
+ def handle_methodnotallowed_exception(error):
+ '''Return a custom message and 405 status code'''
+ problem = ProblemDetails(405, "Method not allowed")
+ return problem.serialize(), 405
+
+ @app.errorhandler(InternalServerError)
+ def handle_internalservererror_exception(error):
+ '''Return a custom message and 500 status code'''
+ problem = ProblemDetails(500, "Internal Server Error")
+ return problem.serialize(), 500
from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy import or_
+from o2common.views.route_exception import BadRequestException
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
if not hasattr(obj, key):
logger.warning('Filter attrName %s not in Object %s.' %
(key, str(obj)))
- return []
+ raise BadRequestException(
+ 'Filter attrName {} not in the Object'.format(key))
if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']:
if len(values) != 1:
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+from . import api_ns, ocloud_route, alarm_route
from o2common.config import config
-from . import ocloud_route, alarm_route
-from . import api_ns
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
from o2common.service.messagebus import MessageBus
from o2common.views.pagination_route import link_header, PAGE_PARAM
+from o2common.views.route_exception import NotFoundException
from o2ims.views import alarm_view
from o2ims.views.api_ns import api_ims_monitoring as api_monitoring_v1
from o2ims.views.alarm_dto import AlarmDTO, SubscriptionDTO
model = AlarmDTO.alarm_event_record_get
- @api_monitoring_v1.doc('Get resource type')
+ @api_monitoring_v1.doc('Get AlarmEventRecord')
@api_monitoring_v1.marshal_with(model)
def get(self, alarmEventRecordId):
result = alarm_view.alarm_event_record_one(alarmEventRecordId, bus.uow)
if result is not None:
return result
- api_monitoring_v1.abort(
- 404, "Resource type {} doesn't exist".format(alarmEventRecordId))
+ raise NotFoundException(
+ "Alarm Event Record {} doesn't exist".format(alarmEventRecordId))
# ---------- Alarm Subscriptions ---------- #
alarmSubscriptionID, bus.uow)
if result is not None:
return result
- api_monitoring_v1.abort(404, "Subscription {} doesn't exist".format(
- alarmSubscriptionID))
+ raise NotFoundException(
+ "Subscription {} doesn't exist".format(alarmSubscriptionID))
@api_monitoring_v1.doc('Delete subscription by ID')
- @api_monitoring_v1.response(204, 'Subscription deleted')
+ @api_monitoring_v1.response(200, 'Subscription deleted')
def delete(self, alarmSubscriptionID):
result = alarm_view.subscription_delete(alarmSubscriptionID, bus.uow)
- return result, 204
+ return result, 200
# 'deploymentManagers': fields.String,
# 'smoRegistrationService': fields.String
'extensions': fields.String
- }
+ },
+ mask='{oCloudId,globalcloudId,name,description,serviceUri}'
)
# 'resourceKind': fields.String,
# 'resourceClass': fields.String,
'extensions': fields.String
- }
+ },
+ mask='{resourceTypeId,name,description,model,vendor,version}'
)
'location': fields.String,
# 'resources': fields.String,
'extensions': fields.String
- }
+ },
+ mask='{resourcePoolId,oCloudId,globalLocationId,name,description}'
)
'description': fields.String,
# 'elements': fields.String,
'extensions': fields.String
- }
+ },
+ mask='{resourceId,resourcePoolId,resourceTypeId,description,parentId}'
)
def recursive_resource_mapping(iteration_number=2):
fields.Nested(ResourceDTO.recursive_resource_mapping(
iteration_number-1)), attribute='children')
return api_ims_inventory_v1.model(
- 'ResourceGetDto' + str(iteration_number), resource_json_mapping)
+ 'ResourceGetDto' + str(iteration_number), resource_json_mapping,
+ mask='{resourceId,resourcePoolId,resourceTypeId,description,' +
+ 'parentId}')
class DeploymentManagerDTO:
description='Profile support list, use default for the return \
endpoint'),
'extensions': fields.String
- }
+ },
+ mask='{deploymentManagerId,name,description,oCloudId,serviceUri,' + \
+ 'profileSupportList}'
)
profile = api_ims_inventory_v1.model("DeploymentManagerGetDtoProfile", {
'profileName': fields.String,
'profileData': fields.Nested(profile, False, True),
'extensions': fields.String
- }
+ },
+ mask='{deploymentManagerId,name,description,oCloudId,serviceUri,' +\
+ 'profileName,profileData}'
)
'callback': fields.String,
'consumerSubscriptionId': fields.String,
'filter': fields.String,
- }
+ },
+ mask='{subscriptionId,callback}'
)
subscription = api_ims_inventory_v1.model(
from o2common.service.messagebus import MessageBus
from o2common.views.pagination_route import link_header, PAGE_PARAM
-from o2common.views.route import ProblemDetails
+from o2common.views.route_exception import NotFoundException
from o2ims.views import ocloud_view
from o2ims.views.api_ns import api_ims_inventory as api_ims_inventory_v1
from o2ims.views.ocloud_dto import OcloudDTO, ResourceTypeDTO,\
res = ocloud_view.oclouds(bus.uow)
if len(res) > 0:
return res[0]
- ProblemDetails(
- api_ims_inventory_v1,
- 404, "oCloud doesn't exist").abort()
+ raise NotFoundException("oCloud doesn't exist")
# ---------- ResourceTypes ---------- #
result = ocloud_view.resource_type_one(resourceTypeID, bus.uow)
if result is not None:
return result
- ProblemDetails(
- api_ims_inventory_v1,
- 404, "Resource type {} doesn't exist".format(
- resourceTypeID)).abort()
+ raise NotFoundException("Resource type {} doesn't exist".format(
+ resourceTypeID))
# ---------- ResourcePools ---------- #
result = ocloud_view.resource_pool_one(resourcePoolID, bus.uow)
if result is not None:
return result
- ProblemDetails(
- api_ims_inventory_v1,
- 404, "Resource pool {} doesn't exist".format(
- resourcePoolID)).abort()
+ raise NotFoundException("Resource pool {} doesn't exist".format(
+ resourcePoolID))
# ---------- Resources ---------- #
result = ocloud_view.resource_one(resourceID, bus.uow)
if result is not None:
return result
- ProblemDetails(
- api_ims_inventory_v1,
- 404, "Resource {} doesn't exist".format(
- resourceID)).abort()
+ raise NotFoundException("Resource {} doesn't exist".format(
+ resourceID))
# ---------- DeploymentManagers ---------- #
deploymentManagerID, bus.uow, profile)
if result is not None:
return result
-
- ProblemDetails(
- api_ims_inventory_v1,
- 404, "Deployment manager {} doesn't exist".format(
- deploymentManagerID)).abort()
+ raise NotFoundException("Deployment manager {} doesn't exist".format(
+ deploymentManagerID))
# ---------- Subscriptions ---------- #
subscriptionID, bus.uow)
if result is not None:
return result
- ProblemDetails(
- api_ims_inventory_v1,
- 404, "Subscription {} doesn't exist".format(
- subscriptionID)).abort()
+ raise NotFoundException("Subscription {} doesn't exist".format(
+ subscriptionID))
@api_ims_inventory_v1.doc('Delete subscription by ID')
- @api_ims_inventory_v1.response(204, 'Subscription deleted')
+ @api_ims_inventory_v1.response(200, 'Subscription deleted')
def delete(self, subscriptionID):
result = ocloud_view.subscription_delete(subscriptionID, bus.uow)
- return result, 204
+ return result, 200
subscription_id1 = str(uuid.uuid4())
resp = client.delete(apibase+"/alarmSubscriptions/"+subscription_id1)
- assert resp.status_code == 204
+ assert resp.status_code == 200
def test_flask_not_allowed(mock_flask_uow):
subscription_id1 = str(uuid.uuid4())
resp = client.delete(apibase+"/subscriptions/"+subscription_id1)
- assert resp.status_code == 204
+ assert resp.status_code == 200
def test_flask_not_allowed(mock_flask_uow):