from werkzeug.wrappers import Request, Response
from o2common.helper import o2logging
from o2common.authmw.authprov import auth_definer
+from flask_restx._http import HTTPStatus
+import json
logger = o2logging.get_logger(__name__)
'WWW-Authenticate': '{}'.format(self.value)}
+class AuthProblemDetails():
+ def __init__(self, code: int, detail: str, path: str,
+ title=None, instance=None
+ ) -> None:
+ self.status = code
+ self.detail = detail
+ self.type = path
+ self.title = title if title is not None else self.getTitle(code)
+ self.instance = instance if instance is not None else []
+
+ def getTitle(self, code):
+ return HTTPStatus(code).phrase
+
+ def serialize(self):
+ details = {}
+ for key in dir(self):
+ if key == 'ns' or key.startswith('__') or \
+ callable(getattr(self, key)):
+ continue
+ else:
+ details[key] = getattr(self, key)
+ return json.dumps(details, indent=True)
+
+
class AuthFailureExp(Exception):
def __init__(self, value):
self.value = value
'WWW-Authenticate': '{}'.format(self.value)}
-def _response_wrapper(environ, start_response, header):
+def _response_wrapper(environ, start_response, header, detail):
res = Response(headers=header,
- mimetype='text/plain', status=401)
+ mimetype='application/json', status=401, response=detail)
return res(environ, start_response)
-def _internal_err_response_wrapper(environ, start_response):
- res = Response(mimetype='text/plain', status=500)
+def _internal_err_response_wrapper(environ, start_response, detail):
+ res = Response(mimetype='application/json', status=500, response=detail)
return res(environ, start_response)
logger.info(__name__ + 'authentication middleware')
req = Request(environ, populate_request=True, shallow=True)
try:
- auth_header = req.headers['Authorization']
-
+ auth_header = req.headers.get('Authorization', None)
if auth_header:
auth_token = auth_header.split(" ")[1]
logger.error(
'Internal exception happend \
ed {}'.format(str(ex)), exc_info=True)
+ prb = AuthProblemDetails(
+ 500, 'Internal error.', req.path)
return \
- _internal_err_response_wrapper(environ,
- start_response)
+ _internal_err_response_wrapper(
+ environ,
+ start_response, prb.serialize())
else:
raise AuthFailureExp(
'Bearer realm="Authentication Failed"')
else:
raise AuthRequiredExp('Bearer realm="Authentication Required"')
except AuthRequiredExp as ex:
- return _response_wrapper(environ, start_response, ex.dictize())
+ prb = AuthProblemDetails(401, ex.value, req.path)
+ return _response_wrapper(environ, start_response,
+ ex.dictize(), prb.serialize())
except AuthFailureExp as ex:
- return _response_wrapper(environ, start_response, ex.dictize())
+ prb = AuthProblemDetails(401, ex.value, req.path)
+ return _response_wrapper(environ, start_response,
+ ex.dictize(), prb.serialize())
except Exception as ex:
logger.error('Internal exception happended {}'.format(
str(ex)), exc_info=True)
- return _internal_err_response_wrapper(environ, start_response)
+ prb = AuthProblemDetails(500, 'Internal error.', req.path)
+ return \
+ _internal_err_response_wrapper(environ,
+ start_response, prb.serialize())
port_external = 30205
port = port_internal if host_external is None or host_external == '' \
else port_external
- return f"http://{host}:{port}"
+ return f"https://{host}:{port}"
def get_root_api_base():
def get_dms_support_profiles():
profiles = config.conf.API.DMS_SUPPORT_PROFILES
- profiles = profiles.replace("'", "").replace(
- '"', "").replace('[', "").replace(']', "")
- profiles = profiles.split(',')
+ if profiles is None or profiles == '':
+ profiles = []
+ elif "[" in profiles and "]" in profiles:
+ profiles = profiles.replace("'", "").replace(
+ '"', "").replace('[', "").replace(']', "")
+ profiles = profiles.split(',')
if 'native_k8sapi' not in profiles:
profiles.append('native_k8sapi')
return profiles
--- /dev/null
+# Copyright (C) 2021-2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from sqlalchemy.sql.elements import ColumnElement
+from sqlalchemy import or_
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+def gen_orm_filter(obj: ColumnElement, filter_str: str):
+ if not filter_str:
+ return []
+ filter_without_space = filter_str.replace(" ", "")
+ items = filter_without_space.split(';')
+
+ filter_list = list()
+ for i in items:
+ if '(' in i:
+ i = i.replace("(", "")
+ if ')' in i:
+ i = i.replace(")", "")
+ filter_expr = i.split(',')
+ if len(filter_expr) < 3:
+ continue
+ filter_op = filter_expr[0]
+ filter_key = filter_expr[1]
+ filter_vals = filter_expr[2:]
+ filter_list.extend(toFilterArgs(
+ filter_op, obj, filter_key, filter_vals))
+ logger.info('Filter list length: %d' % len(filter_list))
+ return filter_list
+
+
+def toFilterArgs(operation: str, obj: ColumnElement, key: str, values: list):
+ if not hasattr(obj, key):
+ logger.warning('Filter attrName %s not in Object %s.' %
+ (key, str(obj)))
+ raise KeyError(
+ 'Filter attrName {} not in the Object'.format(key))
+
+ if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']:
+ if len(values) != 1:
+ raise KeyError(
+ 'Filter operation one {} is only support one value.'.
+ format(operation))
+ elif operation in ['in', 'nin', 'cont', 'ncont']:
+ if len(values) == 0:
+ raise KeyError('Filter operation {} value is needed.'.
+ format(operation))
+ else:
+ raise KeyError('Filter operation {} not support.'.format(operation))
+
+ ll = list()
+ if operation == 'eq':
+ val = values[0]
+ if val.lower() == 'null':
+ val = None
+ ll.append(getattr(obj, key) == val)
+ elif operation == 'neq':
+ val = values[0]
+ if val.lower() == 'null':
+ val = None
+ ll.append(getattr(obj, key) != val)
+ elif operation == 'gt':
+ val = values[0]
+ ll.append(getattr(obj, key) > val)
+ elif operation == 'lt':
+ val = values[0]
+ ll.append(getattr(obj, key) < val)
+ elif operation == 'gte':
+ val = values[0]
+ ll.append(getattr(obj, key) >= val)
+ elif operation == 'lte':
+ val = values[0]
+ ll.append(getattr(obj, key) <= val)
+ elif operation == 'in':
+ ll.append(getattr(obj, key).in_(values))
+ elif operation == 'nin':
+ ll.append(~getattr(obj, key).in_(values))
+ elif operation == 'cont':
+ val_list = list()
+ for val in values:
+ val_list.append(getattr(obj, key).contains(val))
+ ll.append(or_(*val_list))
+ elif operation == 'ncont':
+ val_list = list()
+ for val in values:
+ val_list.append(getattr(obj, key).contains(val))
+ ll.append(~or_(*val_list))
+ return ll
# See the License for the specific language governing permissions and
# limitations under the License.
+import re
from sqlalchemy.sql.elements import ColumnElement
-from sqlalchemy import or_
from o2common.views.route_exception import BadRequestException
+from o2common.domain.filter import gen_orm_filter
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
def gen_filter(obj: ColumnElement, filter_str: str):
- if filter_str == '':
- return []
+ check_filter(obj, filter_str)
+ try:
+ filter_list = gen_orm_filter(obj, filter_str)
+ except KeyError as e:
+ raise BadRequestException(e.args[0])
+ return filter_list
+
+
+# The regular expressions testing example put on here
+# (neq,testkey,value-1)
+# (neq,testkey,value-1,value-2)
+# (gt,hello,1)
+# (gte,world,2)
+# (lt,testlt,notint)
+# (ncont,key1,v1,v_2)
+# (gt,hello,1);(ncont,world,val1,val-2)
+# (eq,wrong,60cba7be-e2cd-3b8c-a7ff-16e0f10573f9)
+# (eq,description,value key)
+def check_filter(obj: ColumnElement, filter_str: str):
+ if not filter_str:
+ return
+ pattern = r'^(\((eq|neq|gt|lt|gte|lte){1},\w+,[\w -\.]+\)\;?|' +\
+ r'\((in|nin|cont|ncont){1},\w*(,[\w -\.]*)*\)\;?)+'
+ result = re.match(pattern, filter_str)
+ logger.warning('filter: {} match result is {}'.format(filter_str, result))
+ if not result:
+ raise BadRequestException(
+ 'filter value formater not correct.')
+ check_filter_attribute(obj, filter_str)
+
+
+def check_filter_attribute(obj: ColumnElement, filter_str: str):
filter_without_space = filter_str.replace(" ", "")
items = filter_without_space.split(';')
- filter_list = list()
for i in items:
if '(' in i:
i = i.replace("(", "")
i = i.replace(")", "")
filter_expr = i.split(',')
if len(filter_expr) < 3:
+ raise BadRequestException(
+ 'Filter {} formater not correct.'.format(i))
continue
- filter_op = filter_expr[0]
+ # filter_op = filter_expr[0]
filter_key = filter_expr[1]
- filter_vals = filter_expr[2:]
- filter_list.extend(toFilterArgs(
- filter_op, obj, filter_key, filter_vals))
- logger.info('Filter list length: %d' % len(filter_list))
- return filter_list
-
-
-def toFilterArgs(operation: str, obj: ColumnElement, key: str, values: list):
- if not hasattr(obj, key):
- logger.warning('Filter attrName %s not in Object %s.' %
- (key, str(obj)))
- raise BadRequestException(
- 'Filter attrName {} not in the Object'.format(key))
-
- if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']:
- if len(values) != 1:
- raise KeyError('Filter operation one is only support one value.')
- elif operation in ['in', 'nin', 'cont', 'ncont']:
- if len(values) == 0:
- raise KeyError('Filter operation value is needed.')
- else:
- raise KeyError('Filter operation value not support.')
-
- ll = list()
- if operation == 'eq':
- val = values[0]
- if val.lower() == 'null':
- val = None
- ll.append(getattr(obj, key) == val)
- elif operation == 'neq':
- val = values[0]
- if val.lower() == 'null':
- val = None
- ll.append(getattr(obj, key) != val)
- elif operation == 'gt':
- val = values[0]
- ll.append(getattr(obj, key) > val)
- elif operation == 'lt':
- val = values[0]
- ll.append(getattr(obj, key) < val)
- elif operation == 'gte':
- val = values[0]
- ll.append(getattr(obj, key) >= val)
- elif operation == 'lte':
- val = values[0]
- ll.append(getattr(obj, key) <= val)
- elif operation == 'in':
- ll.append(getattr(obj, key).in_(values))
- elif operation == 'nin':
- ll.append(~getattr(obj, key).in_(values))
- elif operation == 'cont':
- val_list = list()
- for val in values:
- val_list.append(getattr(obj, key).contains(val))
- ll.append(or_(*val_list))
- elif operation == 'ncont':
- val_list = list()
- for val in values:
- val_list.append(getattr(obj, key).contains(val))
- ll.append(~or_(*val_list))
- return ll
+ # filter_vals = filter_expr[2:]
+ if not hasattr(obj, filter_key):
+ raise BadRequestException(
+ 'Filter attrName {} not in the Object'.format(filter_key))
return self.driver.getPserver(id)
def _list(self, **filters) -> List[ocloudModel.StxGenericModel]:
- filters['resourcepoolid']
return self.driver.getPserverList(**filters)
def _set_stx_client(self):
os_client_args = config.get_stx_access_info(
region_name=subcloud[0].name,
subcloud_hostname=subcloud[0].oam_floating_ip)
- logger.warning(os_client_args)
+ logger.info(os_client_args)
config_client = get_stx_client(**os_client_args)
except EndpointException as e:
msg = e.format_message()
os_client_args = config.get_stx_access_info(
region_name=subcloud[0].name, sub_is_https=True,
subcloud_hostname=subcloud[0].oam_floating_ip)
- logger.warning(os_client_args)
+ logger.info(os_client_args)
config_client = get_stx_client(**os_client_args)
else:
raise ValueError('Stx endpoint exception: %s' % msg)
- else:
+ except Exception:
raise ValueError('cgtsclient get subcloud client failed')
+
return config_client
def setStxClient(self, resource_pool_id):
if systems[0].distributed_cloud_role is None or \
systems[0].distributed_cloud_role != 'systemcontroller':
return [ocloudModel.StxGenericModel(
- ResourceTypeEnum.RESOURCE_POOL, systems[0])]
+ ResourceTypeEnum.RESOURCE_POOL,
+ self._respoolconverter(systems[0]))]
pools = []
if config.get_system_controller_as_respool():
return [ocloudModel.StxGenericModel(
ResourceTypeEnum.RESOURCE_POOL,
- respool) for respool in pools if respool]
+ self._respoolconverter(
+ respool)) for respool in pools if respool]
def getResourcePoolDetail(self, id):
self.setStxClient(id)
systems = self.stxclient.isystem.list()
logger.debug('systems:' + str(systems[0].to_dict()))
return ocloudModel.StxGenericModel(
- ResourceTypeEnum.RESOURCE_POOL, systems[0]) if systems else None
+ ResourceTypeEnum.RESOURCE_POOL,
+ self._respoolconverter(systems[0])) if systems else None
def getPserverList(self, **filters) -> List[ocloudModel.StxGenericModel]:
hosts = self.stxclient.ihost.list()
return [ocloudModel.StxGenericModel(
ResourceTypeEnum.PSERVER, self._hostconverter(host))
for host in hosts if host and (host.availability == 'available'
+ or host.availability == 'online'
or host.availability == 'degraded')]
def getPserver(self, id) -> ocloudModel.StxGenericModel:
'more than one system exists in the account.')
return isystems[0]
+ @ staticmethod
+ def _respoolconverter(res_pool):
+ setattr(res_pool, 'name', res_pool.region_name)
+ return res_pool
+
@ staticmethod
def _hostconverter(host):
setattr(host, 'name', host.hostname)
def create_by(stxobj: StxGenericModel) -> Ocloud:
- imsendpoint = config.get_api_url() + config.get_o2ims_api_base() + '/'
+ imsendpoint = config.get_api_url()
globalcloudId = conf.DEFAULT.ocloud_global_id
description = "An ocloud"
ocloud = Ocloud(stxobj.id, stxobj.name, imsendpoint,
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
# import redis
# import requests
+import ssl
+import json
from urllib.parse import urlparse
# from o2common.config import config
+from o2common.domain.filter import gen_orm_filter
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain import commands
-from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO
-import ssl
from o2common.service.command.handler import get_https_conn_default
from o2common.service.command.handler import get_http_conn
from o2common.service.command.handler import get_https_conn_selfsigned
from o2common.service.command.handler import post_data
+
+from o2ims.domain import commands
+from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO, \
+ AlarmEventRecord
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
cmd: commands.PubAlarm2SMO,
uow: AbstractUnitOfWork,
):
- logger.info('In notify_alarm_to_smo')
+ logger.debug('In notify_alarm_to_smo')
data = cmd.data
with uow:
subs = uow.alarm_subscriptions.list()
logger.debug('Alarm Subscription: {}'.format(
sub_data['alarmSubscriptionId']))
+ alarm = uow.alarm_event_records.get(data.id)
+ if alarm is None:
+ logger.debug('Alarm Event {} does not exists.'.format(data.id))
+ continue
+ if sub_data.get('filter', None):
+ try:
+ args = gen_orm_filter(AlarmEventRecord, sub_data['filter'])
+ except KeyError:
+ logger.warning(
+ 'Alarm Subscription {} filter {} has wrong attribute '
+ 'name or value. Ignore the filter'.format(
+ sub_data['alarmSubscriptionId'],
+ sub_data['filter']))
+ callback_smo(sub, data)
+ continue
+ args.append(AlarmEventRecord.alarmEventRecordId == data.id)
+ ret = uow.alarm_event_records.list_with_count(*args)
+ if ret[0] != 0:
+ logger.debug(
+ 'Alarm Event {} skip for subscription {} because of '
+ 'the filter.'
+ .format(data.id, sub_data['alarmSubscriptionId']))
+ continue
+
callback_smo(sub, data)
return
logger.error('Notify alarm Response code is: {}'.format(status))
except ssl.SSLCertVerificationError as e:
- logger.info(
- 'Notify alarm post data with trusted ca failed: {}'.format(e))
+ logger.debug(
+ 'Notify alarm try to post data with trusted ca \
+ failed: {}'.format(e))
if 'self signed' in str(e):
conn = get_https_conn_selfsigned(o.netloc)
try:
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
# import redis
# import requests
+import json
+import ssl
from urllib.parse import urlparse
# from o2common.config import config
+from o2common.domain.filter import gen_orm_filter
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain import commands
-from o2ims.domain.subscription_obj import Subscription, Message2SMO
from o2common.service.command.handler import get_https_conn_default
from o2common.service.command.handler import get_http_conn
from o2common.service.command.handler import get_https_conn_selfsigned
from o2common.service.command.handler import post_data
-import ssl
+
+from o2ims.domain import commands, ocloud
+from o2ims.domain.subscription_obj import Subscription, Message2SMO
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
cmd: commands.PubMessage2SMO,
uow: AbstractUnitOfWork,
):
- logger.info('In notify_change_to_smo')
+ logger.debug('In notify_change_to_smo')
data = cmd.data
with uow:
subs = uow.subscriptions.list()
for sub in subs:
sub_data = sub.serialize()
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
-
- try:
- resource_filter = json.loads(sub_data['filter'])
- if len(resource_filter) > 0:
- resource = uow.resources.get(data.id)
- logger.debug(type(resource))
- if resource: # TODO deal with resource is empty
- res_type_id = resource.serialize()['resourceTypeId']
- resourcetype = uow.resource_types.get(res_type_id)
- logger.debug(resourcetype.name)
- if resourcetype.name not in resource_filter:
- continue
- except json.decoder.JSONDecodeError as err:
- logger.warning(
- 'subscription filter decode json failed: {}'.format(err))
+ resource = uow.resources.get(data.id)
+ if resource is None:
+ logger.debug('Resource {} does not exists.'.format(data.id))
+ continue
+ res_pool_id = resource.serialize()['resourcePoolId']
+ logger.debug('res pool id is {}'.format(res_pool_id))
+ if sub_data.get('filter', None):
+ try:
+ args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
+ except KeyError:
+ logger.error(
+ 'Subscription {} filter {} has wrong attribute name '
+ 'or value. Ignore the filter.'.format(
+ sub_data['subscriptionId'], sub_data['filter']))
+ callback_smo(sub, data)
+ continue
+ args.append(ocloud.Resource.resourceId == data.id)
+ ret = uow.resources.list_with_count(res_pool_id, *args)
+ if ret[0] != 0:
+ logger.debug(
+ 'Resource {} skip for subscription {} because of the '
+ 'filter.'
+ .format(data.id, sub_data['subscriptionId']))
+ continue
callback_smo(sub, data)
return
logger.error('Notify Response code is: {}'.format(status))
except ssl.SSLCertVerificationError as e:
- logger.info('Notify post data with trusted ca failed: {}'.format(e))
+ logger.debug(
+ 'Notify try to post data with trusted ca failed: {}'.format(e))
if 'self signed' in str(e):
conn = get_https_conn_selfsigned(o.netloc)
try:
try:
return post_data(conn, o.path, callback_data)
except ssl.SSLCertVerificationError as e:
- logger.info('Register to smo with trusted ca failed: {}'.format(e))
+ logger.debug('Try to register to smo with \
+ trusted ca failed: {}'.format(e))
if 'self signed' in str(e):
conn = get_https_conn_selfsigned(o.netloc)
try:
from o2common.service.messagebus import MessageBus
from o2common.views.pagination_route import link_header, PAGE_PARAM
-from o2common.views.route_exception import NotFoundException
+from o2common.views.route_exception import NotFoundException, \
+ BadRequestException
from o2ims.views import alarm_view
from o2ims.views.api_ns import api_ims_monitoring as api_monitoring_v1
from o2ims.views.alarm_dto import AlarmDTO, SubscriptionDTO
mask='{alarmSubscriptionId,callback,consumerSubscriptionId,filter}')
def post(self):
data = api_monitoring_v1.payload
+ callback = data.get('callback', None)
+ if not callback:
+ raise BadRequestException('The callback parameter is required')
+
result = alarm_view.subscription_create(data, bus.uow)
return result, 201
import uuid as uuid
from o2common.service import unit_of_work
+from o2common.views.view import gen_filter, check_filter
from o2common.views.pagination_view import Pagination
-from o2common.views.view import gen_filter
+from o2common.views.route_exception import BadRequestException
+
from o2ims.views.alarm_dto import SubscriptionDTO
from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEventRecord
def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create,
uow: unit_of_work.AbstractUnitOfWork):
+ filter = subscriptionDto.get('filter', '')
+ consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '')
+
+ check_filter(AlarmEventRecord, filter)
sub_uuid = str(uuid.uuid4())
subscription = AlarmSubscription(
sub_uuid, subscriptionDto['callback'],
- subscriptionDto['consumerSubscriptionId'],
- subscriptionDto['filter'])
+ consumer_subs_id, filter)
with uow:
+ args = list()
+ args.append(getattr(AlarmSubscription, 'callback')
+ == subscriptionDto['callback'])
+ args.append(getattr(AlarmSubscription, 'filter') == filter)
+ args.append(getattr(AlarmSubscription,
+ 'consumerSubscriptionId') == consumer_subs_id)
+ count, _ = uow.alarm_subscriptions.list_with_count(*args)
+ if count > 0:
+ raise BadRequestException("The value of parameters is duplicated")
uow.alarm_subscriptions.add(subscription)
uow.commit()
first = uow.alarm_subscriptions.get(sub_uuid)
'helmcli_kubeconfig': fields.String(attribute='helmcli_kubeconfig'),
})
+ extensions = api_ims_inventory_v1.model("DeploymentManagerExtensions", {
+ 'profileName': fields.String,
+ 'profileData': fields.Nested(profile, False, True),
+ })
+
deployment_manager_get = api_ims_inventory_v1.model(
"DeploymentManagerGetDto",
{
# 'supportedLocations': fields.String,
# 'capabilities': fields.String,
# 'capacity': fields.String,
- 'profileName': fields.String,
- 'profileData': fields.Nested(profile, False, True),
- 'extensions': fields.String
+ 'extensions': fields.Nested(extensions, True, True)
},
mask='{deploymentManagerId,name,description,oCloudId,serviceUri,' +\
- 'profileName,profileData}'
+ 'extensions/profileName,extensions/profileData}'
)
from o2common.service.messagebus import MessageBus
from o2common.views.pagination_route import link_header, PAGE_PARAM
-from o2common.views.route_exception import NotFoundException
+from o2common.views.route_exception import NotFoundException, \
+ BadRequestException
from o2ims.domain import ocloud
from o2ims.views import ocloud_view
from o2ims.views.api_ns import api_ims_inventory as api_ims_inventory_v1
mask='{subscriptionId,callback,consumerSubscriptionId,filter}')
def post(self):
data = api_ims_inventory_v1.payload
+ callback = data.get('callback', None)
+ if not callback:
+ raise BadRequestException('The callback parameter is required')
+
result = ocloud_view.subscription_create(data, bus.uow)
return result, 201
from o2common.service import unit_of_work
from o2common.config import config
+from o2common.views.view import gen_filter, check_filter
from o2common.views.pagination_view import Pagination
-from o2common.views.view import gen_filter
+from o2common.views.route_exception import BadRequestException
+
from o2ims.domain import ocloud
from o2ims.views.ocloud_dto import SubscriptionDTO
from o2ims.domain.subscription_obj import Subscription
return None
profile_data = result.pop("profile", None)
- result['profileName'] = profile
profiles = config.get_dms_support_profiles()
if profile not in profiles:
return ""
+ extensions = {
+ 'profileName': profile
+ }
if ocloud.DeploymentManagerProfileDefault == profile \
or ocloud.DeploymentManagerProfileSOL018 == profile:
result['serviceUri'] = \
profile_data['cluster_api_endpoint']
- result['profileData'] = profile_data
+ extensions['profileData'] = profile_data
elif ocloud.DeploymentManagerProfileSOL018HelmCLI == profile:
result['serviceUri'] = \
profile_data['cluster_api_endpoint']
config.get_helmcli_access()
helmcli_profile["helmcli_kubeconfig"] = _gen_kube_config(
deploymentManagerId, profile_data)
- result['profileData'] = helmcli_profile
+ extensions['profileData'] = helmcli_profile
else:
return ""
+ result['extensions'] = extensions
return result
def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create,
uow: unit_of_work.AbstractUnitOfWork):
+ filter = subscriptionDto.get('filter', '')
+ consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '')
+
+ check_filter(ocloud.Resource, filter)
sub_uuid = str(uuid.uuid4())
subscription = Subscription(
sub_uuid, subscriptionDto['callback'],
- subscriptionDto['consumerSubscriptionId'],
- subscriptionDto['filter'])
+ consumer_subs_id, filter)
with uow:
+ args = list()
+ args.append(getattr(Subscription, 'callback')
+ == subscriptionDto['callback'])
+ args.append(getattr(Subscription, 'filter') == filter)
+ args.append(getattr(Subscription,
+ 'consumerSubscriptionId') == consumer_subs_id)
+ count, _ = uow.subscriptions.list_with_count(*args)
+ if count > 0:
+ raise BadRequestException("The value of parameters is duplicated")
uow.subscriptions.add(subscription)
uow.commit()
first = uow.subscriptions.get(sub_uuid)
assert resp.status_code == 404
-def test_flask_post(mock_flask_uow):
+def test_flask_post(mock_flask_uow, mappers):
session, app = mock_flask_uow
apibase = config.get_o2ims_monitoring_api_base() + '/v1'
+ order_by = MagicMock()
+ order_by.count.return_value = 0
+ order_by.limit.return_value.offset.return_value = []
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = order_by
+
with app.test_client() as client:
session.return_value.execute.return_value = []
resp = client.post(apibase+'/alarmSubscriptions', json={
'callback': sub_callback,
'consumerSubscriptionId': 'consumerSubId1',
- 'filter': 'empty'
+ 'filter': '(eq,resourceTypeId,xxx)'
})
assert resp.status_code == 201
assert 'alarmSubscriptionId' in resp.get_json()
deployment_manager_id1, uow, profile=profileName)
assert str(deployment_manager_res.get(
'serviceUri')) == cluster_endpoint
- assert str(deployment_manager_res.get(
+ assert str(deployment_manager_res.get("extensions").get(
"profileName")) == profileName
# profile wrong name
assert resp.status_code == 404
-def test_flask_post(mock_flask_uow):
+def test_flask_post(mock_flask_uow, mappers):
session, app = mock_flask_uow
apibase = config.get_o2ims_api_base() + '/v1'
+ order_by = MagicMock()
+ order_by.count.return_value = 0
+ order_by.limit.return_value.offset.return_value = []
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = order_by
+
with app.test_client() as client:
session.return_value.execute.return_value = []
resp = client.post(apibase+'/subscriptions', json={
'callback': sub_callback,
'consumerSubscriptionId': 'consumerSubId1',
- 'filter': 'empty'
+ 'filter': '(eq,resourceTypeId,xxx)'
})
assert resp.status_code == 201
assert 'subscriptionId' in resp.get_json()