smo_register_url = http://127.0.0.1:8090/register
smo_token_data = smo_token_payload
+[OCLOUD]
+OS_AUTH_URL =
+OS_USERNAME =
+OS_PASSWORD =
+API_HOST_EXTERNAL_FLOATING =
+
[API]
+# support native_k8sapi,sol018,sol018_helmcli
+# if the value is black, then native_k8sapi will set by default
+DMS_SUPPORT_PROFILES =
[WATCHER]
smo_register_url = http://127.0.0.1:8090/register
smo_token_data = ${SMO_TOKEN_DATA}
+ [OCLOUD]
+ OS_AUTH_URL: ${OS_AUTH_URL}
+ OS_USERNAME: ${OS_USERNAME}
+ OS_PASSWORD: ${OS_PASSWORD}
+ API_HOST_EXTERNAL_FLOATING: ${API_HOST_EXTERNAL_FLOATING}
+
[API]
[WATCHER]
pullPolicy: IfNotPresent
logginglevel: "DEBUG"
- ocloud:
- OS_AUTH_URL: "${OS_AUTH_URL}"
- OS_USERNAME: "${OS_USERNAME}"
- OS_PASSWORD: "${OS_PASSWORD}"
- API_HOST_EXTERNAL_FLOATING: "${API_HOST_EXTERNAL_FLOATING}"
-
applicationconfig: ${applicationconfig}
servercrt: ${servercrt}
serverkey: ${serverkey}
from o2app import bootstrap
from o2ims.views import configure_namespace as ims_route_configure_namespace
+from o2common.views.route_exception import configure_exception
from o2ims.adapter.clients.alarm_dict_client import load_alarm_definition
from o2common.authmw import authmiddleware
app.config.SWAGGER_UI_DOC_EXPANSION = 'list'
# app.config['RESTX_MASK_HEADER'] = 'fields'
app.config['RESTX_MASK_SWAGGER'] = False
+app.config['ERROR_INCLUDE_MESSAGE'] = False
api = Api(app, version=FLASK_API_VERSION,
title='INF O2 Services API',
description='Swagger OpenAPI document for the INF O2 Services',
)
bus = bootstrap.bootstrap()
+configure_exception(api)
ims_route_configure_namespace(api)
load_alarm_definition(bus.uow)
import sys
from urllib.parse import urlparse
+from o2common import config
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
def get_api_url():
host_interal = os.environ.get("API_HOST", "localhost")
host_external = os.environ.get("API_HOST_EXTERNAL_FLOATING")
+ if config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING is not None and \
+ config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING != '':
+ host_external = config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING
host = host_interal if host_external is None or host_external == '' \
else host_external
return smo_o2endpoint
+def get_stx_client_args():
+ client_args = dict(
+ auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
+ username=os.environ.get('OS_USERNAME', "admin"),
+ api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
+ project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
+ )
+ if config.conf.OCLOUD.OS_AUTH_URL is not None and \
+ config.conf.OCLOUD.OS_AUTH_URL != '':
+ client_args['auth_url'] = config.conf.OCLOUD.OS_AUTH_URL
+ if config.conf.OCLOUD.OS_USERNAME is not None and \
+ config.conf.OCLOUD.OS_USERNAME != '':
+ client_args['username'] = config.conf.OCLOUD.OS_USERNAME
+ if config.conf.OCLOUD.OS_PASSWORD is not None and \
+ config.conf.OCLOUD.OS_PASSWORD != '':
+ client_args['api_key'] = config.conf.OCLOUD.OS_PASSWORD
+ if config.conf.OCLOUD.OS_PROJECT_NAME is not None and \
+ config.conf.OCLOUD.OS_PROJECT_NAME != '':
+ client_args['project_name'] = config.conf.OCLOUD.OS_PROJECT_NAME
+ return client_args
+
+
def get_stx_access_info(region_name="RegionOne", subcloud_hostname: str = "",
sub_is_https: bool = False):
# authurl = os.environ.get("STX_AUTH_URL", "http://192.168.204.1:5000/v3")
# pswd = os.environ.get("STX_PASSWORD", "passwd1")
# stx_access_info = (authurl, username, pswd)
try:
- client_args = dict(
- auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
- username=os.environ.get('OS_USERNAME', "admin"),
- api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
- project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
- )
+ # client_args = dict(
+ # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
+ # username=os.environ.get('OS_USERNAME', "admin"),
+ # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
+ # project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
+ # )
+ client_args = get_stx_client_args()
except KeyError:
logger.error('Please source your RC file before execution, '
'e.g.: `source ~/downloads/admin-rc.sh`')
def get_dc_access_info():
try:
- client_args = dict(
- auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
- username=os.environ.get('OS_USERNAME', "admin"),
- api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
- project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
- )
+ # client_args = dict(
+ # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
+ # username=os.environ.get('OS_USERNAME', "admin"),
+ # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
+ # project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
+ # )
+ client_args = get_stx_client_args()
except KeyError:
logger.error('Please source your RC file before execution, '
'e.g.: `source ~/downloads/admin-rc.sh`')
def get_fm_access_info():
try:
- client_args = dict(
- auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
- username=os.environ.get('OS_USERNAME', "admin"),
- api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
- project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
- )
+ # client_args = dict(
+ # auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
+ # username=os.environ.get('OS_USERNAME', "admin"),
+ # api_key=os.environ.get('OS_PASSWORD', "fakepasswd1"),
+ # project_name=os.environ.get('OS_PROJECT_NAME', "admin"),
+ # )
+ client_args = get_stx_client_args()
except KeyError:
logger.error('Please source your RC file before execution, '
'e.g.: `source ~/downloads/admin-rc.sh`')
def get_helmcli_access():
host_external = os.environ.get("API_HOST_EXTERNAL_FLOATING")
+ if config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING is not None and \
+ config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING != '':
+ host_external = config.conf.OCLOUD.API_HOST_EXTERNAL_FLOATING
host = "127.0.0.1" if host_external is None or host_external == '' \
else host_external
port = "10022" if host_external is None or host_external == '' \
return events_yaml_name
return "/configs/events.yaml"
-# get k8s host from env:
-
+# get k8s host from env:
def get_k8s_host():
k8s_host = os.environ.get("KUBERNETES_SERVICE_HOST")
if k8s_host is None:
raise Exception('Get k8s host failed.')
return k8s_host
-# get k8s host port from env:
-
+# get k8s host port from env:
def get_k8s_port():
k8s_port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS", '443')
return k8s_port
-# token review url
-
+# token review url
def get_review_url():
try:
api = '/apis/authentication.k8s.io/v1/tokenreviews'
except Exception:
raise Exception('Get k8s review url failed')
-# get reviewer token
-
+# get reviewer token
def get_reviewer_token():
# token path default is below.
token_path = '/var/run/secrets/kubernetes.io/serviceaccount/token'
def get_auth_provider():
return 'k8s'
+
+
+def get_dms_support_profiles():
+ profiles = config.conf.API.DMS_SUPPORT_PROFILES
+ profiles = profiles.replace("'", "").replace(
+ '"', "").replace('[', "").replace(']', "")
+ profiles = profiles.split(',')
+ if 'native_k8sapi' not in profiles:
+ profiles.append('native_k8sapi')
+ return profiles
# limitations under the License.
from urllib.parse import urlparse
-from flask import abort
+
+from o2common.views.route_exception import BadRequestException
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
page_current = ret.pop('page_current')
if page_current > page_total:
- abort(400, "Page size {} bad request.".format(page_current))
+ raise BadRequestException(
+ "Page size {} bad request.".format(page_current))
if 0 == count:
return [], {'X-Total-Count': count}
from flask_restx.fields import List, Nested, String
from flask_restx.utils import unpack
+from o2common.views.route_exception import BadRequestException
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
mask_val = ''
if 'all_fields' in kwargs:
all_fields_without_space = kwargs['all_fields'].replace(" ", "")
- all_fields = all_fields_without_space.lower()
- if 'true' == all_fields:
- mask_val = ''
+ logger.info('all_fields selector value is {}'.format(
+ all_fields_without_space))
+ # all_fields = all_fields_without_space.lower()
+ # if 'true' == all_fields:
+ selector = self.__gen_selector_from_model_with_value(
+ self.fields)
+ mask_val = self.__gen_mask_from_selector(selector)
elif 'fields' in kwargs and kwargs['fields'] != '':
fields_without_space = kwargs['fields'].replace(" ", "")
selector = {}
self.__update_selector_value(selector, fields_without_space, True)
+ self.__set_default_mask(selector)
mask_val = self.__gen_mask_from_selector(selector)
self.__update_selector_value(
selector, exclude_fields_without_space, False)
+ self.__set_default_mask(selector)
mask_val = self.__gen_mask_from_selector(selector)
elif 'exclude_default' in kwargs and kwargs['exclude_default'] != '':
selector[i] = default_val
return selector
- def __update_selector_value(self, default_selector: dict, filter: str,
+ def __update_selector_value(self, selector: dict, filter: str,
val: bool):
fields = filter.split(',')
for f in fields:
if '/' in f:
- self.__update_selector_tree_value(default_selector, f, val)
+ self.__update_selector_tree_value(selector, f, val)
continue
- default_selector[f] = val
+ if f not in self.fields:
+ raise BadRequestException(
+ 'Selector attribute {} not found'.format(f))
+ selector[f] = val
def __update_selector_tree_value(self, m: dict, filter: str, val: bool):
filter_list = filter.split('/', 1)
mask_li.append(k)
return '{%s}' % ','.join(mask_li)
+
+ def __set_default_mask(self, selector: dict, val: bool = True):
+ default_selector = str(getattr(self.fields, "__mask__"))[1:-1]
+ self.__update_selector_value(selector, default_selector, val)
--- /dev/null
+# Copyright (C) 2021-2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from flask import request
+from flask_restx._http import HTTPStatus
+from werkzeug.exceptions import (
+ BadRequest,
+ MethodNotAllowed,
+ NotFound,
+ InternalServerError,
+)
+
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class BadRequestException(BadRequest):
+ def __init__(self, desc=None, resp=None):
+ super().__init__(description=desc, response=resp)
+
+
+class NotFoundException(NotFound):
+ def __init__(self, desc=None, resp=None):
+ super().__init__(description=desc, response=resp)
+
+
+class ProblemDetails():
+ def __init__(self, code: int, detail: str,
+ title=None, instance=None
+ ) -> None:
+ self.status = code
+ self.detail = detail
+ self.type = request.path
+ self.title = title if title is not None else self.getTitle(code)
+ self.instance = instance if instance is not None else []
+
+ def getTitle(self, code):
+ return HTTPStatus(code).phrase
+
+ def serialize(self):
+ details = {}
+ for key in dir(self):
+ if key == 'ns' or key.startswith('__') or \
+ callable(getattr(self, key)):
+ continue
+ else:
+ details[key] = getattr(self, key)
+ return details
+
+
+def configure_exception(app):
+
+ @app.errorhandler(BadRequestException)
+ def handle_badrequest_exception(error):
+ '''Return a custom message and 400 status code'''
+ type(error)
+ problem = ProblemDetails(400, str(error))
+ return problem.serialize(), 400
+
+ @app.errorhandler(NotFoundException)
+ def handle_notfound_exception(error):
+ '''Return a custom message and 404 status code'''
+ problem = ProblemDetails(404, str(error))
+ return problem.serialize(), 404
+
+ @app.errorhandler(MethodNotAllowed)
+ def handle_methodnotallowed_exception(error):
+ '''Return a custom message and 405 status code'''
+ problem = ProblemDetails(405, "Method not allowed")
+ return problem.serialize(), 405
+
+ @app.errorhandler(InternalServerError)
+ def handle_internalservererror_exception(error):
+ '''Return a custom message and 500 status code'''
+ problem = ProblemDetails(500, "Internal Server Error")
+ return problem.serialize(), 500
from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy import or_
+from o2common.views.route_exception import BadRequestException
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
if not hasattr(obj, key):
logger.warning('Filter attrName %s not in Object %s.' %
(key, str(obj)))
- return []
+ raise BadRequestException(
+ 'Filter attrName {} not in the Object'.format(key))
if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']:
if len(values) != 1:
import json
from o2common.domain.base import AgRoot, Serializer
-from o2common.config import conf as CONF
+from o2common.config import config, conf as CONF
# from dataclasses import dataclass
# from datetime import date
# from typing import Optional, List, Set
from .resource_type import ResourceKindEnum, ResourceTypeEnum
-# from uuid import UUID
-DeploymentManagerProfileDefault = 'default'
+DeploymentManagerProfileDefault = 'native_k8sapi'
DeploymentManagerProfileSOL018 = 'sol018'
DeploymentManagerProfileSOL018HelmCLI = 'sol018_helmcli'
d['profile'] = json.loads(d['profile'])
d['profileSupportList'] = [
DeploymentManagerProfileDefault,
- DeploymentManagerProfileSOL018,
- DeploymentManagerProfileSOL018HelmCLI,
]
+ profiles = config.get_dms_support_profiles()
+ for profile in profiles:
+ if profile == DeploymentManagerProfileSOL018:
+ d['profileSupportList'].append(profile)
+ elif profile == DeploymentManagerProfileSOL018HelmCLI:
+ d['profileSupportList'].append(profile)
return d
)
first = res.first()
if first is None:
- resourcetype_id = str(uuid.uuid4())
- uow.resource_types.add(ResourceType(
- resourcetype_id,
- 'pserver_mem', stxobj.type,
- resourcepool.oCloudId))
res_type_name = 'pserver_mem'
resourcetype_id = str(uuid.uuid3(
uuid.NAMESPACE_URL, res_type_name))
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+from . import api_ns, ocloud_route, alarm_route
from o2common.config import config
-from . import ocloud_route, alarm_route
-from . import api_ns
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
from o2common.service.messagebus import MessageBus
from o2common.views.pagination_route import link_header, PAGE_PARAM
+from o2common.views.route_exception import NotFoundException
from o2ims.views import alarm_view
from o2ims.views.api_ns import api_ims_monitoring as api_monitoring_v1
from o2ims.views.alarm_dto import AlarmDTO, SubscriptionDTO
model = AlarmDTO.alarm_event_record_get
- @api_monitoring_v1.doc('Get resource type')
+ @api_monitoring_v1.doc('Get AlarmEventRecord')
@api_monitoring_v1.marshal_with(model)
def get(self, alarmEventRecordId):
result = alarm_view.alarm_event_record_one(alarmEventRecordId, bus.uow)
if result is not None:
return result
- api_monitoring_v1.abort(
- 404, "Resource type {} doesn't exist".format(alarmEventRecordId))
+ raise NotFoundException(
+ "Alarm Event Record {} doesn't exist".format(alarmEventRecordId))
# ---------- Alarm Subscriptions ---------- #
alarmSubscriptionID, bus.uow)
if result is not None:
return result
- api_monitoring_v1.abort(404, "Subscription {} doesn't exist".format(
- alarmSubscriptionID))
+ raise NotFoundException(
+ "Subscription {} doesn't exist".format(alarmSubscriptionID))
@api_monitoring_v1.doc('Delete subscription by ID')
- @api_monitoring_v1.response(204, 'Subscription deleted')
+ @api_monitoring_v1.response(200, 'Subscription deleted')
def delete(self, alarmSubscriptionID):
result = alarm_view.subscription_delete(alarmSubscriptionID, bus.uow)
- return result, 204
+ return result, 200
# 'deploymentManagers': fields.String,
# 'smoRegistrationService': fields.String
'extensions': fields.String
- }
+ },
+ mask='{oCloudId,globalcloudId,name,description,serviceUri}'
)
# 'resourceKind': fields.String,
# 'resourceClass': fields.String,
'extensions': fields.String
- }
+ },
+ mask='{resourceTypeId,name,description,model,vendor,version}'
)
'location': fields.String,
# 'resources': fields.String,
'extensions': fields.String
- }
+ },
+ mask='{resourcePoolId,oCloudId,globalLocationId,name,description}'
)
'description': fields.String,
# 'elements': fields.String,
'extensions': fields.String
- }
+ },
+ mask='{resourceId,resourcePoolId,resourceTypeId,description,parentId}'
)
def recursive_resource_mapping(iteration_number=2):
fields.Nested(ResourceDTO.recursive_resource_mapping(
iteration_number-1)), attribute='children')
return api_ims_inventory_v1.model(
- 'ResourceGetDto' + str(iteration_number), resource_json_mapping)
+ 'ResourceGetDto' + str(iteration_number), resource_json_mapping,
+ mask='{resourceId,resourcePoolId,resourceTypeId,description,' +
+ 'parentId}')
class DeploymentManagerDTO:
description='Profile support list, use default for the return \
endpoint'),
'extensions': fields.String
- }
+ },
+ mask='{deploymentManagerId,name,description,oCloudId,serviceUri,' + \
+ 'profileSupportList}'
)
profile = api_ims_inventory_v1.model("DeploymentManagerGetDtoProfile", {
'profileName': fields.String,
'profileData': fields.Nested(profile, False, True),
'extensions': fields.String
- }
+ },
+ mask='{deploymentManagerId,name,description,oCloudId,serviceUri,' +\
+ 'profileName,profileData}'
)
'callback': fields.String,
'consumerSubscriptionId': fields.String,
'filter': fields.String,
- }
+ },
+ mask='{subscriptionId,callback}'
)
subscription = api_ims_inventory_v1.model(
from o2common.service.messagebus import MessageBus
from o2common.views.pagination_route import link_header, PAGE_PARAM
+from o2common.views.route_exception import NotFoundException
+from o2ims.domain import ocloud
from o2ims.views import ocloud_view
from o2ims.views.api_ns import api_ims_inventory as api_ims_inventory_v1
from o2ims.views.ocloud_dto import OcloudDTO, ResourceTypeDTO,\
res = ocloud_view.oclouds(bus.uow)
if len(res) > 0:
return res[0]
- api_ims_inventory_v1.abort(
- 404, "oCloud doesn't exist")
+ raise NotFoundException("oCloud doesn't exist")
# ---------- ResourceTypes ---------- #
result = ocloud_view.resource_type_one(resourceTypeID, bus.uow)
if result is not None:
return result
- api_ims_inventory_v1.abort(
- 404, "Resource type {} doesn't exist".format(resourceTypeID))
+ raise NotFoundException("Resource type {} doesn't exist".format(
+ resourceTypeID))
# ---------- ResourcePools ---------- #
result = ocloud_view.resource_pool_one(resourcePoolID, bus.uow)
if result is not None:
return result
- api_ims_inventory_v1.abort(
- 404, "Resource pool {} doesn't exist".format(resourcePoolID))
+ raise NotFoundException("Resource pool {} doesn't exist".format(
+ resourcePoolID))
# ---------- Resources ---------- #
result = ocloud_view.resource_one(resourceID, bus.uow)
if result is not None:
return result
- api_ims_inventory_v1.abort(
- 404, "Resource {} doesn't exist".format(resourceID))
+ raise NotFoundException("Resource {} doesn't exist".format(
+ resourceID))
# ---------- DeploymentManagers ---------- #
@api_ims_inventory_v1.route("/v1/deploymentManagers/<deploymentManagerID>")
@api_ims_inventory_v1.param('deploymentManagerID',
'ID of the deployment manager')
-@api_ims_inventory_v1.param('profile', 'DMS profile: value supports "sol018"',
- _in='query')
+@api_ims_inventory_v1.param(
+ 'profile', 'DMS profile: value supports "native_k8sapi"',
+ _in='query')
@api_ims_inventory_v1.response(404, 'Deployment manager not found')
@api_ims_inventory_v1.param(
'all_fields',
args = parser.parse_args()
profile = (
args.profile if args.profile is not None and args.profile != ''
- else 'default')
+ else ocloud.DeploymentManagerProfileDefault)
result = ocloud_view.deployment_manager_one(
deploymentManagerID, bus.uow, profile)
- if result is not None:
+ if result is not None and result != "":
return result
- api_ims_inventory_v1.abort(
- 404,
- "Deployment manager {} doesn't exist".format(deploymentManagerID))
+ elif result == "":
+ raise NotFoundException(
+ "Profile {} doesn't support".format(
+ args.profile))
+
+ raise NotFoundException("Deployment manager {} doesn't exist".format(
+ deploymentManagerID))
# ---------- Subscriptions ---------- #
subscriptionID, bus.uow)
if result is not None:
return result
- api_ims_inventory_v1.abort(404, "Subscription {} doesn't exist".format(
+ raise NotFoundException("Subscription {} doesn't exist".format(
subscriptionID))
@api_ims_inventory_v1.doc('Delete subscription by ID')
- @api_ims_inventory_v1.response(204, 'Subscription deleted')
+ @api_ims_inventory_v1.response(200, 'Subscription deleted')
def delete(self, subscriptionID):
result = ocloud_view.subscription_delete(subscriptionID, bus.uow)
- return result, 204
+ return result, 200
def deployment_manager_one(deploymentManagerId: str,
uow: unit_of_work.AbstractUnitOfWork,
- profile: str = 'default'):
+ profile: str =
+ ocloud.DeploymentManagerProfileDefault):
profile = profile.lower()
with uow:
first = uow.deployment_managers.get(deploymentManagerId)
profile_data = result.pop("profile", None)
result['profileName'] = profile
+ profiles = config.get_dms_support_profiles()
+ if profile not in profiles:
+ return ""
- if ocloud.DeploymentManagerProfileDefault == profile:
- pass
- elif ocloud.DeploymentManagerProfileSOL018 == profile:
+ if ocloud.DeploymentManagerProfileDefault == profile \
+ or ocloud.DeploymentManagerProfileSOL018 == profile:
result['serviceUri'] = \
profile_data['cluster_api_endpoint']
result['profileData'] = profile_data
deploymentManagerId, profile_data)
result['profileData'] = helmcli_profile
else:
- return None
+ return ""
return result
subscription_id1 = str(uuid.uuid4())
resp = client.delete(apibase+"/alarmSubscriptions/"+subscription_id1)
- assert resp.status_code == 204
+ assert resp.status_code == 200
def test_flask_not_allowed(mock_flask_uow):
from o2ims.domain import ocloud, subscription_obj
from o2ims.domain import resource_type as rt
from o2ims.views import ocloud_view
-from o2common.config import config
+from o2common.config import config, conf as CONF
def setup_ocloud():
"profile": {}
}
+ CONF.API.DMS_SUPPORT_PROFILES = 'native_k8sapi,sol018,sol018_helmcli'
+ cluster_endpoint = "https://test_k8s:6443"
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value['profile'] = {
+ "cluster_api_endpoint": cluster_endpoint
+ }
+
# profile default
deployment_manager_res = ocloud_view.deployment_manager_one(
deployment_manager_id1, uow)
assert str(deployment_manager_res.get(
"deploymentManagerId")) == deployment_manager_id1
assert str(deployment_manager_res.get(
- 'serviceUri')) == dms_endpoint
+ 'serviceUri')) == cluster_endpoint
assert deployment_manager_res.get('profile') is None
# profile sol018
profileName = ocloud.DeploymentManagerProfileSOL018
- cluster_endpoint = "https://test_k8s:6443"
session.return_value.query.return_value.filter_by.return_value.first.\
return_value.serialize.return_value['profile'] = {
"cluster_api_endpoint": cluster_endpoint
# profile wrong name
profileName = 'wrong_profile'
- session.return_value.query.return_value.filter_by.return_value.first.\
- return_value.serialize.return_value['profile'] = {
- "cluster_api_endpoint": cluster_endpoint
- }
deployment_manager_res = ocloud_view.deployment_manager_one(
deployment_manager_id1, uow, profile=profileName)
- assert deployment_manager_res is None
+ assert deployment_manager_res == ""
def test_view_subscriptions(mock_uow):
subscription_id1 = str(uuid.uuid4())
resp = client.delete(apibase+"/subscriptions/"+subscription_id1)
- assert resp.status_code == 204
+ assert resp.status_code == 200
def test_flask_not_allowed(mock_flask_uow):