[DEFAULT]
ocloud_global_id = 4e24b97c-8c49-4c4f-b53e-3de5235a4e37
-smo_url = http://127.0.0.1:8090/register
+smo_register_url = http://127.0.0.1:8090/register
[API]
test = "hello"
.ResourceSqlAlchemyRepository(self.session)
self.subscriptions = ocloud_repository\
.SubscriptionSqlAlchemyRepository(self.session)
- self.configurations = ocloud_repository\
- .ConfigurationSqlAlchemyRepository(self.session)
self.deployment_managers = ocloud_repository\
.DeploymentManagerSqlAlchemyRepository(self.session)
self.nfdeployment_descs = dms_repository\
for entry in self.subscriptions.seen:
while hasattr(entry, 'events') and len(entry.events) > 0:
yield entry.events.pop(0)
- for entry in self.configurations.seen:
- while hasattr(entry, 'events') and len(entry.events) > 0:
- yield entry.events.pop(0)
for entry in self.nfdeployment_descs.seen:
while hasattr(entry, 'events') and len(entry.events) > 0:
yield entry.events.pop(0)
# limitations under the License.
# import json
+
import redis
import json
from o2app import bootstrap
from o2common.config import config
-# from o2common.domain import commands
from o2dms.domain import commands
from o2ims.domain import commands as imscmd
+from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
+from o2ims.domain.alarm_obj import AlarmEvent2SMO
from o2common.helper import o2logging
-from o2ims.domain.subscription_obj import Message2SMO, NotificationEventEnum,\
- RegistrationMessage
-from o2ims.domain.alarm_obj import AlarmEvent2SMO
logger = o2logging.get_logger(__name__)
r = redis.Redis(**config.get_redis_host_and_port())
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe("NfDeploymentStateChanged")
pubsub.subscribe('ResourceChanged')
- pubsub.subscribe('ConfigurationChanged')
pubsub.subscribe('OcloudChanged')
pubsub.subscribe('AlarmEventChanged')
eventtype=data['notificationEventType'],
updatetime=data['updatetime']))
bus.handle(cmd)
- elif channel == 'ConfigurationChanged':
- datastr = m['data']
- data = json.loads(datastr)
- logger.info('ConfigurationChanged with cmd:{}'.format(data))
- cmd = imscmd.Register2SMO(data=RegistrationMessage(id=data['id']))
- bus.handle(cmd)
elif channel == 'OcloudChanged':
datastr = m['data']
data = json.loads(datastr)
logger.info('OcloudChanged with cmd:{}'.format(data))
- if data['notificationEventType'] == NotificationEventEnum.CREATE:
- cmd = imscmd.Register2SMO(data=RegistrationMessage(is_all=True))
- bus.handle(cmd)
+ cmd = imscmd.Register2SMO(data=RegistrationMessage(
+ data['notificationEventType'],
+ id=data['id']))
+ bus.handle(cmd)
elif channel == 'AlarmEventChanged':
datastr = m['data']
data = json.loads(datastr)
from o2dms.service import nfdeployment_handler
# from dataclasses import asdict
-from typing import List, Dict, Callable, Type
+from typing import Dict, Callable, Type
# TYPE_CHECKING
from o2ims.domain import commands, events
from o2ims.service.command import notify_handler, registration_handler,\
notify_alarm_handler
from o2ims.service.event import ocloud_event, resource_event, \
- resource_pool_event, configuration_event, alarm_event
+ resource_pool_event, alarm_event
# if TYPE_CHECKING:
# from . import unit_of_work
events.ResourceChanged: [resource_event.notify_resource_change],
events.ResourcePoolChanged: [resource_pool_event.\
notify_resourcepool_change],
- events.ConfigurationChanged: [configuration_event.\
- notify_configuration_change],
events.AlarmEventChanged: [alarm_event.\
notify_alarm_event_change],
} # type: Dict[Type[events.Event], Callable]
conf.read(file_path)
default_group = self._set('DEFAULT')
for option in conf['DEFAULT']:
- print(option)
default_group._set(option, conf['DEFAULT'][option])
for section in conf.sections():
group = self._set(section)
return get_root_api_base() + 'o2ims-infrastructureMonitoring/v1'
-def get_provision_api_base():
- return get_root_api_base() + 'provision/v1'
-
-
def get_o2dms_api_base():
return get_root_api_base() + "o2dms/v1"
def serialize(self):
try:
- # d = {c: getattr(self, c) for c in inspect(self).attrs.keys()}
- # if 'createtime' in d:
- # d['createtime'] = d['createtime'].isoformat()
- # if 'updatetime' in d:
- # d['updatetime'] = d['updatetime'].isoformat()
- # return d
- return {c: getattr(self, c) for c in inspect(self).attrs.keys()}
+ d = {c: getattr(self, c) for c in inspect(self).attrs.keys()}
+ if 'createtime' in d:
+ d['createtime'] = d['createtime'].isoformat()
+ if 'updatetime' in d:
+ d['updatetime'] = d['updatetime'].isoformat()
+ return d
+ # return {c: getattr(self, c) for c in inspect(self).attrs.keys()}
except NoInspectionAvailable:
return self.__dict__
from typing import List
-from o2ims.domain import ocloud, subscription_obj, configuration_obj
+from o2ims.domain import ocloud, subscription_obj
from o2ims.domain.ocloud_repo import OcloudRepository, ResourceTypeRepository,\
ResourcePoolRepository, ResourceRepository, DeploymentManagerRepository
from o2ims.domain.subscription_repo import SubscriptionRepository
-from o2ims.domain.configuration_repo import ConfigurationRepository
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
def _delete(self, subscription_id):
self.session.query(subscription_obj.Subscription).filter_by(
subscriptionId=subscription_id).delete()
-
-
-class ConfigurationSqlAlchemyRepository(ConfigurationRepository):
- def __init__(self, session):
- super().__init__()
- self.session = session
-
- def _add(self, configuration: configuration_obj.Configuration):
- self.session.add(configuration)
-
- def _get(self, configuration_id) -> configuration_obj.Configuration:
- return self.session.query(configuration_obj.Configuration).filter_by(
- configurationId=configuration_id).first()
-
- def _list(self) -> List[configuration_obj.Configuration]:
- return self.session.query(configuration_obj.Configuration)
-
- def _update(self, configuration: configuration_obj.Configuration):
- self.session.add(configuration)
-
- def _delete(self, configuration_id):
- self.session.query(configuration_obj.Configuration).filter_by(
- configurationId=configuration_id).delete()
from o2ims.domain import ocloud as ocloudModel
from o2ims.domain import subscription_obj as subModel
-from o2ims.domain import configuration_obj as confModel
from o2ims.domain import alarm_obj as alarmModel
from o2ims.domain.resource_type import ResourceTypeEnum
# from o2ims.domain.alarm_obj import AlarmLastChangeEnum, PerceivedSeverityEnum
Column("globalcloudId", String(255)),
Column("name", String(255)),
Column("description", String(255)),
- Column("infrastructureManagementServiceEndpoint", String(255))
+ Column("serviceUri", String(255))
# Column("extensions", String(1024))
)
Column("filter", String(255)),
)
-configuration = Table(
- "configuration",
- metadata,
- Column("updatetime", DateTime),
- Column("createtime", DateTime),
-
- Column("configurationId", String(255), primary_key=True),
- Column("conftype", String(255)),
- Column("callback", String(255)),
- Column("status", String(255)),
- Column("comments", String(255)),
-)
-
alarm_definition = Table(
"alarmDefinition",
metadata,
}
)
mapper(subModel.Subscription, subscription)
- mapper(confModel.Configuration, configuration)
# IMS Infrastruture Monitoring Mappering
mapper(alarmModel.AlarmEventRecord, alarm_event_record)
+++ /dev/null
-# Copyright (C) 2021 Wind River Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from __future__ import annotations
-from enum import Enum
-# from dataclasses import dataclass
-
-from o2common.domain.base import AgRoot, Serializer
-
-
-class RegistrationStatusEnum(str, Enum):
- CREATED = 'CREATED'
- NOTIFIED = 'NOTIFIED'
- FAILED = 'FAILED'
-
-
-class ConfigurationTypeEnum(str, Enum):
- SMO = 'SMO'
-
-
-class Configuration(AgRoot, Serializer):
- def __init__(self, id: str, url: str,
- conf_type: ConfigurationTypeEnum,
- status: RegistrationStatusEnum =
- RegistrationStatusEnum.CREATED,
- comments: str = '') -> None:
- super().__init__()
- self.configurationId = id
- self.conftype = conf_type
- self.callback = url
- self.status = status
- self.comments = comments
-
- def serialize_smo(self):
- if self.conftype != ConfigurationTypeEnum.SMO:
- return
-
- d = Serializer.serialize(self)
-
- d['endpoint'] = d['callback']
- d['id'] = d['configurationId']
- return d
+++ /dev/null
-# Copyright (C) 2021 Wind River Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import abc
-from typing import List, Set
-from o2ims.domain import configuration_obj as obj
-
-
-class ConfigurationRepository(abc.ABC):
- def __init__(self):
- self.seen = set() # type: Set[obj.Configuration]
-
- def add(self, configuration: obj.Configuration):
- self._add(configuration)
- self.seen.add(configuration)
-
- def get(self, configuration_id) -> obj.Configuration:
- configuration = self._get(configuration_id)
- if configuration:
- self.seen.add(configuration)
- return configuration
-
- def list(self) -> List[obj.Configuration]:
- return self._list()
-
- def update(self, configuration: obj.Configuration):
- self._update(configuration)
-
- def delete(self, configuration_id):
- self._delete(configuration_id)
-
- @abc.abstractmethod
- def _add(self, configuration: obj.Configuration):
- raise NotImplementedError
-
- @abc.abstractmethod
- def _get(self, configuration_id) -> obj.Configuration:
- raise NotImplementedError
-
- @abc.abstractmethod
- def _update(self, configuration: obj.Configuration):
- raise NotImplementedError
-
- @abc.abstractmethod
- def _delete(self, configuration_id):
- raise NotImplementedError
updatetime: datetime.now()
-@dataclass
-class ConfigurationChanged(Event):
- id: str
- updatetime: datetime.now()
-
-
@dataclass
class AlarmEventChanged(Event):
id: str
self.version_number = version_number
self.name = name
self.description = description
- self.infrastructureManagementServiceEndpoint = imsendpoint
+ self.serviceUri = imsendpoint
self.resourcePools = []
self.deploymentManagers = []
self.resourceTypes = []
class RegistrationMessage(Serializer):
- def __init__(self, is_all: bool = None, id: str = '') -> None:
- self.all = is_all if is_all is not None else False
+ def __init__(self, eventtype: NotificationEventEnum, id: str = '') -> None:
+ self.notificationEventType = eventtype
self.id = id
from retry import retry
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2common.config import config
+from o2common.config import config, conf
+
from o2ims.domain import commands
-from o2ims.domain.configuration_obj import ConfigurationTypeEnum, \
- RegistrationStatusEnum
+from o2ims.domain.subscription_obj import NotificationEventEnum
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
):
logger.info('In registry_to_smo')
data = cmd.data
- logger.info('The Register2SMO all is {}'.format(data.all))
- if data.all:
- confs = uow.configrations.list()
- for conf in confs:
- if conf.conftype != ConfigurationTypeEnum.SMO:
- continue
- reg_data = conf.serialize()
- logger.debug('Configuration: {}'.format(
- reg_data['configurationId']))
-
- register_smo(uow, reg_data)
- else:
- with uow:
- conf = uow.configurations.get(data.id)
- if conf is None:
- return
- logger.debug('Configuration: {}'.format(conf.configurationId))
- conf_data = conf.serialize()
- register_smo(uow, conf_data)
+ logger.info('The Register2SMO notificationEventType is {}'.format(
+ data.notificationEventType))
+ with uow:
+ ocloud = uow.oclouds.get(data.id)
+ if ocloud is None:
+ return
+ logger.debug('O-Cloud Global UUID: {}'.format(ocloud.globalcloudId))
+ ocloud_dict = ocloud.serialize()
+ if data.notificationEventType == NotificationEventEnum.CREATE:
+ register_smo(uow, ocloud_dict)
-def register_smo(uow, reg_data):
- call_res = call_smo(reg_data)
+def register_smo(uow, ocloud_data):
+ call_res = call_smo(ocloud_data)
logger.debug('Call SMO response is {}'.format(call_res))
- if call_res:
- reg = uow.configurations.get(reg_data['configurationId'])
- if reg is None:
- return
- reg.status = RegistrationStatusEnum.NOTIFIED
- logger.debug('Updating Configurations: {}'.format(
- reg.configurationId))
- uow.configurations.update(reg)
- uow.commit()
+ # TODO: record the result for the smo register
# def retry(fun, max_tries=2):
@retry((ConnectionRefusedError), tries=2, delay=2)
def call_smo(reg_data: dict):
callback_data = json.dumps({
- 'consumerSubscriptionId': reg_data['configurationId'],
- 'imsUrl': config.get_api_url()
+ 'consumerSubscriptionId': reg_data['globalcloudId'],
+ 'notificationEventType': 'CREATE',
+ 'objectRef': config.get_api_url(),
+ 'postObjectState': reg_data
})
logger.info('URL: {}, data: {}'.format(
- reg_data['callback'], callback_data))
+ conf.DEFAULT.smo_register_url, callback_data))
- o = urlparse(reg_data['callback'])
+ o = urlparse(conf.DEFAULT.smo_register_url)
conn = http.client.HTTPConnection(o.netloc)
headers = {'Content-type': 'application/json'}
conn.request('POST', o.path, callback_data, headers)
+++ /dev/null
-# Copyright (C) 2021 Wind River Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import Callable
-
-from o2ims.domain import events
-
-from o2common.helper import o2logging
-logger = o2logging.get_logger(__name__)
-
-
-def notify_configuration_change(
- event: events.ConfigurationChanged,
- publish: Callable,
-):
- logger.info('In notify_registration_change')
- publish("ConfigurationChanged", event)
- logger.debug("published Registration Changed: {}".format(
- event.id))
from o2common.config import config
-from . import ocloud_route, provision_route, alarm_route
+from . import ocloud_route, alarm_route
from . import api_ns
from o2common.helper import o2logging
def configure_namespace(app):
apiims = config.get_o2ims_api_base()
- apiprovision = config.get_provision_api_base()
apimonitoring = config.get_o2ims_monitoring_api_base()
logger.info(
- "Expose the O2 IMS API:{}\nExpose Provision API: {} \
+ "Expose the O2 IMS API:{}\n \
\nExpose Monitoring API: {}".
- format(apiims, apiprovision, apimonitoring))
+ format(apiims, apimonitoring))
ocloud_route.configure_api_route()
- provision_route.configure_api_route()
alarm_route.configure_api_route()
app.add_namespace(api_ns.api_ims_inventory_v1, path=apiims)
- app.add_namespace(api_ns.api_provision_v1, path=apiprovision)
app.add_namespace(api_ns.api_monitoring_v1, path=apimonitoring)
+++ /dev/null
-# Copyright (C) 2021 Wind River Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from flask_restx import fields
-
-from o2ims.views.api_ns import api_provision_v1
-
-
-class SmoEndpointDTO:
-
- endpoint_get = api_provision_v1.model(
- "SmoEndpointGetDto",
- {
- 'id': fields.String(required=True,
- description='SMO Endpoint Configuration ID'),
- 'endpoint': fields.String,
- 'status': fields.String,
- 'comments': fields.String,
- }
- )
-
- endpoint = api_provision_v1.model(
- "SmoEndpointCreateDto",
- {
- 'endpoint': fields.String(
- required=True,
- description='Configuration SMO callback address',
- example='http://mock_smo:80/registration')
- }
- )
-
- endpoint_post_resp = api_provision_v1.model(
- "SmoEndpointCreatedRespDto",
- {
- 'id': fields.String(required=True,
- description='SMO Endpoint Configuration ID'),
- }
- )
+++ /dev/null
-# Copyright (C) 2021 Wind River Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from flask_restx import Resource
-
-from o2common.service.messagebus import MessageBus
-from o2ims.views import provision_view
-from o2ims.views.api_ns import api_provision_v1
-from o2ims.views.provision_dto import SmoEndpointDTO
-
-
-def configure_api_route():
- # Set global bus for resource
- global bus
- bus = MessageBus.get_instance()
-
-
-# ---------- SMO endpoint ---------- #
-@api_provision_v1.route("/smo-endpoint")
-class SmoEndpointListRouter(Resource):
-
- model = SmoEndpointDTO.endpoint_get
- expect = SmoEndpointDTO.endpoint
- post_resp = SmoEndpointDTO.endpoint_post_resp
-
- @api_provision_v1.doc('List SMO endpoints')
- @api_provision_v1.marshal_list_with(model)
- def get(self):
- return provision_view.configurations(bus.uow)
-
- @api_provision_v1.doc('Create a SMO endpoint')
- @api_provision_v1.expect(expect)
- @api_provision_v1.marshal_with(post_resp, code=201)
- def post(self):
- data = api_provision_v1.payload
- result = provision_view.configuration_create(data, bus)
- return result, 201
-
-
-@api_provision_v1.route("/smo-endpoint/<configurationID>")
-@api_provision_v1.param('configurationID',
- 'ID of the SMO endpoint configuration')
-@api_provision_v1.response(404, 'SMO Endpoint configuration not found')
-class SmoEndpointGetDelRouter(Resource):
-
- model = SmoEndpointDTO.endpoint_get
-
- @api_provision_v1.doc('Get configuration by ID')
- @api_provision_v1.marshal_with(model)
- def get(self, configurationID):
- result = provision_view.configuration_one(
- configurationID, bus.uow)
- if result is not None:
- return result
- api_provision_v1.abort(404,
- "SMO Endpoint configuration {} doesn't exist".
- format(configurationID))
-
- @api_provision_v1.doc('Delete configuration by ID')
- @api_provision_v1.response(204, 'Configuration deleted')
- def delete(self, configurationID):
- result = provision_view.configuration_delete(configurationID, bus.uow)
- return result, 204
+++ /dev/null
-# Copyright (C) 2021 Wind River Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import uuid
-from datetime import datetime
-
-from o2common.service import unit_of_work, messagebus
-from o2ims.domain import events
-from o2ims.views.provision_dto import SmoEndpointDTO
-from o2ims.domain.configuration_obj import Configuration, ConfigurationTypeEnum
-
-
-def configurations(uow: unit_of_work.AbstractUnitOfWork):
- with uow:
- li = uow.configurations.list()
- return [r.serialize_smo() for r in li]
-
-
-def configuration_one(configurationId: str,
- uow: unit_of_work.AbstractUnitOfWork):
- with uow:
- first = uow.configurations.get(configurationId)
- return first.serialize_smo() if first is not None else None
-
-
-def configuration_create(configurationDto: SmoEndpointDTO.endpoint,
- bus: messagebus.MessageBus):
-
- conf_uuid = str(uuid.uuid4())
- configuration = Configuration(
- conf_uuid, configurationDto['endpoint'], ConfigurationTypeEnum.SMO)
- with bus.uow as uow:
- uow.configurations.add(configuration)
- logging.debug('before event length {}'.format(
- len(configuration.events)))
- configuration.events.append(events.ConfigurationChanged(
- conf_uuid,
- datetime.now()))
- logging.debug('after event length {}'.format(
- len(configuration.events)))
- uow.commit()
- _handle_events(bus)
- return {"id": conf_uuid}
-
-
-def configuration_delete(configurationId: str,
- uow: unit_of_work.AbstractUnitOfWork):
- with uow:
- uow.configurations.delete(configurationId)
- uow.commit()
- return True
-
-
-def _handle_events(bus: messagebus.MessageBus):
- # handle events
- events = bus.uow.collect_new_events()
- for event in events:
- bus.handle(event)
- return True
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
# pylint: disable=redefined-outer-name
import shutil
import subprocess
from tenacity import retry, stop_after_delay
from unittest.mock import MagicMock
-from o2common.config import config
-
-from o2ims.adapter.orm import metadata, start_o2ims_mappers
-# from o2ims.adapter.clients.orm_stx import start_o2ims_stx_mappers
-
-from o2app.adapter import unit_of_work
-from o2ims.views import configure_namespace
-
from o2app.bootstrap import bootstrap
+from o2ims.views import configure_namespace
+from o2app.adapter import unit_of_work
+from o2ims.adapter.orm import metadata, start_o2ims_mappers
+from o2common.config import config
-#import os
-#os.environ['ALARM_YAML'] = 'configs/alarm.yaml'
+# import os
+# os.environ['O2APP_CONFIG'] = 'configs/o2app.conf'
+# os.environ['ALARM_YAML'] = 'configs/alarm.yaml'
@pytest.fixture
from unittest.mock import MagicMock
# from o2dms.domain import dms
-from o2ims.domain import ocloud, subscription_obj, configuration_obj
+from o2ims.domain import ocloud, subscription_obj
from o2ims.domain import resource_type as rt
from o2ims.views import ocloud_view
from o2common.config import config
subscription1.subscriptionId == subscription_id1
-def test_new_configuration():
- configuration_id1 = str(uuid.uuid4())
- configuration1 = configuration_obj.Configuration(
- configuration_id1, "https://callback/uri/write/here",
- "SMO")
- assert configuration_id1 is not None and\
- configuration1.configurationId == configuration_id1
-
-
def test_view_olcouds(mock_uow):
session, uow = mock_uow
+++ /dev/null
-# Copyright (C) 2021 Wind River Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import uuid
-from unittest.mock import MagicMock
-
-from o2ims.domain import configuration_obj
-from o2ims.views import provision_view
-from o2common.config import config
-
-
-def test_new_smo_endpoint():
- configuration_id1 = str(uuid.uuid4())
- configuration1 = configuration_obj.Configuration(
- configuration_id1, "https://callback/uri/write/here",
- "SMO")
- assert configuration_id1 is not None and\
- configuration1.configurationId == configuration_id1
-
-
-def test_view_smo_endpoint(mock_uow):
- session, uow = mock_uow
-
- configuration_id1 = str(uuid.uuid4())
- conf1 = MagicMock()
- conf1.serialize_smo.return_value = {
- "id": configuration_id1,
- }
- session.return_value.query.return_value = [conf1]
-
- configuration_list = provision_view.configurations(uow)
- assert str(configuration_list[0].get(
- "id")) == configuration_id1
-
-
-def test_view_smo_endpoint_one(mock_uow):
- session, uow = mock_uow
-
- configuration_id1 = str(uuid.uuid4())
- session.return_value.query.return_value.filter_by.return_value.first.\
- return_value.serialize_smo.return_value = None
-
- # Query return None
- configuration_res = provision_view.configuration_one(
- configuration_id1, uow)
- assert configuration_res is None
-
- session.return_value.query.return_value.filter_by.return_value.first.\
- return_value.serialize_smo.return_value = {
- "id": configuration_id1,
- }
-
- configuration_res = provision_view.configuration_one(
- configuration_id1, uow)
- assert str(configuration_res.get(
- "id")) == configuration_id1
-
-
-def test_flask_get_list(mock_flask_uow):
- session, app = mock_flask_uow
- session.query.return_value = []
- apibase = config.get_provision_api_base()
-
- with app.test_client() as client:
- # Get list and return empty list
- ##########################
- resp = client.get(apibase+"/smo-endpoint")
- assert resp.get_data() == b'[]\n'
-
-
-def test_flask_get_one(mock_flask_uow):
- session, app = mock_flask_uow
-
- session.return_value.query.return_value.filter_by.return_value.\
- first.return_value = None
- apibase = config.get_provision_api_base()
-
- with app.test_client() as client:
- # Get one and return 404
- ###########################
- configuration_id1 = str(uuid.uuid4())
- resp = client.get(apibase+"/smo-endpoint/"+configuration_id1)
- assert resp.status_code == 404
-
-
-# def test_flask_post(mock_flask_uow):
-# session, app = mock_flask_uow
-# apibase = config.get_provision_api_base()
-
-# with app.test_client() as client:
-# session.return_value.execute.return_value = []
-
-# conf_callback = 'http://registration/callback/url'
-# resp = client.post(apibase+'/smo-endpoint', json={
-# 'endpoint': conf_callback
-# })
-# assert resp.status_code == 201
-# assert 'id' in resp.get_json()
-
-
-def test_flask_delete(mock_flask_uow):
- session, app = mock_flask_uow
- apibase = config.get_provision_api_base()
-
- with app.test_client() as client:
- session.return_value.execute.return_value.first.return_value = {}
-
- configuration_id1 = str(uuid.uuid4())
- resp = client.delete(apibase+"/smo-endpoint/"+configuration_id1)
- assert resp.status_code == 204
-
-
-def test_flask_not_allowed(mock_flask_uow):
- _, app = mock_flask_uow
- apibase = config.get_provision_api_base()
-
- with app.test_client() as client:
-
- # Testing SMO endpoint not support method
- ##########################
- uri = apibase + "/smo-endpoint"
- resp = client.put(uri)
- assert resp.status == '405 METHOD NOT ALLOWED'
- resp = client.patch(uri)
- assert resp.status == '405 METHOD NOT ALLOWED'
- resp = client.delete(uri)
- assert resp.status == '405 METHOD NOT ALLOWED'
-
- configuration_id1 = str(uuid.uuid4())
- uri = apibase + "/smo-endpoint/" + configuration_id1
- resp = client.post(uri)
- assert resp.status == '405 METHOD NOT ALLOWED'
- resp = client.put(uri)
- assert resp.status == '405 METHOD NOT ALLOWED'
- resp = client.patch(uri)
- assert resp.status == '405 METHOD NOT ALLOWED'