From 730b4043e3512893704b4ec66492812dd3f0adc1 Mon Sep 17 00:00:00 2001 From: "Zhang Rong(Jon)" Date: Fri, 24 Dec 2021 16:54:24 +0800 Subject: [PATCH] Add the command that registers to the SMO; Make the create registration and create ocloud event link to the register command 1. Add a command that registers to the SMO, it can base on a parameter to make a choice that it calls all SMO in the confiration or not 2. Create a registration event that can trigger the register action 3. Update the ocloud changed event that can trigger the register action 4. Redesign the Registration domain that updates column key to clarify the different status, includeing created, notified, and failed Issue-ID: INF-249 Signed-off-by: Zhang Rong(Jon) Change-Id: Ia734688b47c5125a3a1e1158d544f218ab741576 (cherry picked from commit 3da89330f3837ac6cffd2cad4c4018c9f8c3327d) --- docker-compose.yml | 1 + mock_smo/mock_smo/entrypoints/mock_smo.py | 6 ++ o2app/adapter/unit_of_work.py | 6 ++ o2app/entrypoints/redis_eventconsumer.py | 19 ++++- o2app/service/handlers.py | 9 ++- o2common/domain/base.py | 1 + o2common/domain/events.py | 2 + o2ims/adapter/orm.py | 5 +- o2ims/domain/commands.py | 7 +- o2ims/domain/events.py | 7 ++ o2ims/domain/subscription_obj.py | 32 ++++++-- o2ims/service/auditor/ocloud_handler.py | 5 +- o2ims/service/command/notify_handler.py | 8 +- o2ims/service/command/registration_handler.py | 103 ++++++++++++++++++++++++++ o2ims/service/event/registration_event.py | 30 ++++++++ o2ims/views/ocloud_route.py | 2 +- o2ims/views/ocloud_view.py | 24 +++++- 17 files changed, 240 insertions(+), 27 deletions(-) create mode 100644 o2ims/service/command/registration_handler.py create mode 100644 o2ims/service/event/registration_event.py diff --git a/docker-compose.yml b/docker-compose.yml index 563ab4b..135537e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,6 +11,7 @@ services: - postgres - redis environment: + - API_HOST=api - DB_HOST=postgres - DB_PASSWORD=o2ims123 - REDIS_HOST=redis diff --git a/mock_smo/mock_smo/entrypoints/mock_smo.py b/mock_smo/mock_smo/entrypoints/mock_smo.py index f5c5895..aea0840 100644 --- a/mock_smo/mock_smo/entrypoints/mock_smo.py +++ b/mock_smo/mock_smo/entrypoints/mock_smo.py @@ -81,6 +81,12 @@ def callback(): return '', 202 +@app.route('/registration', methods=['POST']) +def registration(): + logger.info('Registration data: {}'.format(request.get_data())) + return '', 200 + + def subscription_ims(url, consumerSubscriptionId): sub_key = r.get(REDIS_SUB_KEY) logger.info('Subscription key is {}'.format(sub_key)) diff --git a/o2app/adapter/unit_of_work.py b/o2app/adapter/unit_of_work.py index 1c9cae0..de48001 100644 --- a/o2app/adapter/unit_of_work.py +++ b/o2app/adapter/unit_of_work.py @@ -90,6 +90,12 @@ class SqlAlchemyUnitOfWork(AbstractUnitOfWork): for entry in self.deployment_managers.seen: while hasattr(entry, 'events') and len(entry.events) > 0: yield entry.events.pop(0) + for entry in self.subscriptions.seen: + while hasattr(entry, 'events') and len(entry.events) > 0: + yield entry.events.pop(0) + for entry in self.registrations.seen: + while hasattr(entry, 'events') and len(entry.events) > 0: + yield entry.events.pop(0) for entry in self.nfdeployment_descs.seen: while hasattr(entry, 'events') and len(entry.events) > 0: yield entry.events.pop(0) diff --git a/o2app/entrypoints/redis_eventconsumer.py b/o2app/entrypoints/redis_eventconsumer.py index ea49edd..d4c7c65 100644 --- a/o2app/entrypoints/redis_eventconsumer.py +++ b/o2app/entrypoints/redis_eventconsumer.py @@ -22,7 +22,7 @@ from o2dms.domain import commands from o2ims.domain import commands as imscmd from o2common.helper import o2logging -from o2ims.domain.subscription_obj import Message2SMO +from o2ims.domain.subscription_obj import Message2SMO, NotificationEventEnum, RegistrationMessage logger = o2logging.get_logger(__name__) r = redis.Redis(**config.get_redis_host_and_port()) @@ -36,6 +36,8 @@ def main(): pubsub = r.pubsub(ignore_subscribe_messages=True) pubsub.subscribe("NfDeploymentStateChanged") pubsub.subscribe('ResourceChanged') + pubsub.subscribe('RegistrationChanged') + pubsub.subscribe('OcloudChanged') for m in pubsub.listen(): try: @@ -58,7 +60,7 @@ def handle_dms_changed(m, bus): ToState=data['ToState'] ) bus.handle(cmd) - if channel == 'ResourceChanged': + elif channel == 'ResourceChanged': datastr = m['data'] data = json.loads(datastr) logger.info('ResourceChanged with cmd:{}'.format(data)) @@ -69,6 +71,19 @@ def handle_dms_changed(m, bus): eventtype=data['notificationEventType'], updatetime=data['updatetime'])) bus.handle(cmd) + elif channel == 'RegistrationChanged': + datastr = m['data'] + data = json.loads(datastr) + logger.info('RegistrationChanged with cmd:{}'.format(data)) + cmd = imscmd.Register2SMO(data=RegistrationMessage(id=data['id'])) + bus.handle(cmd) + elif channel == 'OcloudChanged': + datastr = m['data'] + data = json.loads(datastr) + logger.info('OcloudChanged with cmd:{}'.format(data)) + if data['notificationEventType'] == NotificationEventEnum.CREATE: + cmd = imscmd.Register2SMO(data=RegistrationMessage(is_all=True)) + bus.handle(cmd) else: logger.info("unhandled:{}".format(channel)) diff --git a/o2app/service/handlers.py b/o2app/service/handlers.py index 833ef4e..deef1a4 100644 --- a/o2app/service/handlers.py +++ b/o2app/service/handlers.py @@ -27,9 +27,9 @@ from o2ims.service.auditor import ocloud_handler, dms_handler, \ resourcepool_handler, pserver_handler, pserver_cpu_handler, \ pserver_mem_handler, pserver_port_handler, pserver_if_handler,\ pserver_eth_handler -from o2ims.service.command import notify_handler -from o2ims.service.event import ocloud_event, \ - resource_event, resource_pool_event +from o2ims.service.command import notify_handler, registration_handler +from o2ims.service.event import ocloud_event, resource_event, \ + resource_pool_event, registration_event # if TYPE_CHECKING: # from . import unit_of_work @@ -55,6 +55,8 @@ EVENT_HANDLERS = { events.ResourceChanged: [resource_event.notify_resource_change], events.ResourcePoolChanged: [resource_pool_event.\ notify_resourcepool_change], + events.RegistrationChanged: [registration_event.\ + notify_registration_change], } # type: Dict[Type[events.Event], Callable] @@ -77,4 +79,5 @@ COMMAND_HANDLERS = { o2dms_cmmands.DeleteNfDeployment: nfdeployment_handler.delete_nfdeployment, commands.PubMessage2SMO: notify_handler.notify_change_to_smo, + commands.Register2SMO: registration_handler.registry_to_smo, } # type: Dict[Type[commands.Command], Callable] diff --git a/o2common/domain/base.py b/o2common/domain/base.py index 130fdad..d7c94cd 100644 --- a/o2common/domain/base.py +++ b/o2common/domain/base.py @@ -16,6 +16,7 @@ from datetime import datetime from typing import List from sqlalchemy.inspection import inspect from sqlalchemy.exc import NoInspectionAvailable +# from sqlalchemy.orm.exc import DetachedInstanceError from .events import Event diff --git a/o2common/domain/events.py b/o2common/domain/events.py index 19d7e11..50d25e6 100644 --- a/o2common/domain/events.py +++ b/o2common/domain/events.py @@ -15,6 +15,8 @@ # pylint: disable=too-few-public-methods # from dataclasses import dataclass +# from datetime import datetime + class Event: pass diff --git a/o2ims/adapter/orm.py b/o2ims/adapter/orm.py index 6b290a5..29fff79 100644 --- a/o2ims/adapter/orm.py +++ b/o2ims/adapter/orm.py @@ -24,7 +24,7 @@ from sqlalchemy import ( # Date, DateTime, ForeignKey, - Boolean, + # Boolean, # engine, # event, ) @@ -153,7 +153,8 @@ registration = Table( Column("registrationId", String(255), primary_key=True), Column("callback", String(255)), - Column("notified", Boolean), + Column("status", String(255)), + Column("comments", String(255)), ) diff --git a/o2ims/domain/commands.py b/o2ims/domain/commands.py index 2b2aca6..bcd6e86 100644 --- a/o2ims/domain/commands.py +++ b/o2ims/domain/commands.py @@ -19,7 +19,7 @@ from dataclasses import dataclass # from datetime import datetime # from o2ims.domain.resource_type import ResourceTypeEnum from o2ims.domain.stx_object import StxGenericModel -from o2ims.domain.subscription_obj import Message2SMO +from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage from o2common.domain.commands import Command @@ -33,6 +33,11 @@ class PubMessage2SMO(Command): data: Message2SMO +@dataclass +class Register2SMO(Command): + data: RegistrationMessage + + @dataclass class UpdateOCloud(UpdateStxObject): pass diff --git a/o2ims/domain/events.py b/o2ims/domain/events.py index a4a2375..6f81a84 100644 --- a/o2ims/domain/events.py +++ b/o2ims/domain/events.py @@ -15,6 +15,7 @@ # pylint: disable=too-few-public-methods from dataclasses import dataclass from datetime import datetime + from o2common.domain.events import Event from o2ims.domain.subscription_obj import NotificationEventEnum @@ -45,3 +46,9 @@ class ResourceChanged(Event): resourcePoolId: str notificationEventType: NotificationEventEnum updatetime: datetime.now() + + +@dataclass +class RegistrationChanged(Event): + id: str + updatetime: datetime.now() diff --git a/o2ims/domain/subscription_obj.py b/o2ims/domain/subscription_obj.py index 846bf95..ff8beaf 100644 --- a/o2ims/domain/subscription_obj.py +++ b/o2ims/domain/subscription_obj.py @@ -30,14 +30,6 @@ class Subscription(AgRoot, Serializer): self.filter = filter -class Registration(AgRoot, Serializer): - def __init__(self, id: str, url: str) -> None: - super().__init__() - self.registrationId = id - self.callback = url - self.notified = False - - class NotificationEventEnum(str, Enum): CREATE = 'CREATE' MODIFY = 'MODIFY' @@ -53,6 +45,30 @@ class Message2SMO(Serializer): self.updatetime = updatetime +class RegistrationStatusEnum(str, Enum): + CREATED = 'CREATED' + NOTIFIED = 'NOTIFIED' + FAILED = 'FAILED' + + +class Registration(AgRoot, Serializer): + def __init__(self, id: str, url: str, + status: RegistrationStatusEnum = + RegistrationStatusEnum.CREATED, + comments: str = '') -> None: + super().__init__() + self.registrationId = id + self.callback = url + self.status = status + self.comments = comments + + +class RegistrationMessage(Serializer): + def __init__(self, is_all: bool = None, id: str = '') -> None: + self.all = is_all if is_all is not None else False + self.id = id + + @dataclass class EventState(): Initial = 0 diff --git a/o2ims/service/auditor/ocloud_handler.py b/o2ims/service/auditor/ocloud_handler.py index 97cca3e..7548d58 100644 --- a/o2ims/service/auditor/ocloud_handler.py +++ b/o2ims/service/auditor/ocloud_handler.py @@ -14,7 +14,7 @@ # pylint: disable=unused-argument from __future__ import annotations -from typing import Callable +# from typing import Callable # from dataclasses import asdict # from typing import List, Dict, Callable, Type @@ -41,8 +41,7 @@ class InvalidResourceType(Exception): def update_ocloud( cmd: commands.UpdateOCloud, - uow: AbstractUnitOfWork, - publish: Callable + uow: AbstractUnitOfWork ): stxobj = cmd.data with uow: diff --git a/o2ims/service/command/notify_handler.py b/o2ims/service/command/notify_handler.py index 3ece84a..9555623 100644 --- a/o2ims/service/command/notify_handler.py +++ b/o2ims/service/command/notify_handler.py @@ -13,12 +13,12 @@ # limitations under the License. import json -import redis +# import redis # import requests import http.client from urllib.parse import urlparse -from o2common.config import config +# from o2common.config import config from o2common.service.unit_of_work import AbstractUnitOfWork from o2ims.domain import commands from o2ims.domain.subscription_obj import Subscription, Message2SMO @@ -26,8 +26,8 @@ from o2ims.domain.subscription_obj import Subscription, Message2SMO from o2common.helper import o2logging logger = o2logging.get_logger(__name__) -# Maybe another MQ server -r = redis.Redis(**config.get_redis_host_and_port()) +# # Maybe another MQ server +# r = redis.Redis(**config.get_redis_host_and_port()) def notify_change_to_smo( diff --git a/o2ims/service/command/registration_handler.py b/o2ims/service/command/registration_handler.py new file mode 100644 index 0000000..0a4395d --- /dev/null +++ b/o2ims/service/command/registration_handler.py @@ -0,0 +1,103 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# import time +import json +# import asyncio +# import requests +import http.client +from urllib.parse import urlparse +from retry import retry + +from o2common.service.unit_of_work import AbstractUnitOfWork +from o2common.config import config +from o2ims.domain import commands +from o2ims.domain.subscription_obj import RegistrationStatusEnum + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +def registry_to_smo( + cmd: commands.Register2SMO, + uow: AbstractUnitOfWork, +): + logger.info('In registry_to_smo') + data = cmd.data + logger.info('The Register2SMO all is {}'.format(data.all)) + if data.all: + regs = uow.registrations.list() + for reg in regs: + reg_data = reg.serialize() + logger.debug('Registration: {}'.format(reg_data['registrationId'])) + + register_smo(uow, reg_data) + else: + with uow: + reg = uow.registrations.get(data.id) + if reg is None: + return + logger.debug('Registration: {}'.format(reg.registrationId)) + reg_data = reg.serialize() + register_smo(uow, reg_data) + + +def register_smo(uow, reg_data): + call_res = call_smo(reg_data) + logger.debug('Call SMO response is {}'.format(call_res)) + if call_res: + reg = uow.registrations.get(reg_data['registrationId']) + if reg is None: + return + reg.status = RegistrationStatusEnum.NOTIFIED + logger.debug('Updating Registration: {}'.format( + reg.registrationId)) + uow.registrations.update(reg) + uow.commit() + + +# def retry(fun, max_tries=2): +# for i in range(max_tries): +# try: +# time.sleep(5*i) +# # await asyncio.sleep(5*i) +# res = fun() +# logger.debug('retry function result: {}'.format(res)) +# return res +# except Exception: +# continue + + +@retry((ConnectionRefusedError), tries=2, delay=2) +def call_smo(reg_data: dict): + callback_data = json.dumps({ + 'consumerSubscriptionId': reg_data['registrationId'], + 'imsUrl': config.get_api_url() + }) + logger.info('URL: {}, data: {}'.format( + reg_data['callback'], callback_data)) + + o = urlparse(reg_data['callback']) + conn = http.client.HTTPConnection(o.netloc) + headers = {'Content-type': 'application/json'} + conn.request('POST', o.path, callback_data, headers) + resp = conn.getresponse() + data = resp.read().decode('utf-8') + # json_data = json.loads(data) + if resp.status == 202 or resp.status == 200: + logger.info('Registrer to SMO successed, response code {} {}, data {}'. + format(resp.status, resp.reason, data)) + return True + logger.error('Response code is: {}'.format(resp.status)) + return False diff --git a/o2ims/service/event/registration_event.py b/o2ims/service/event/registration_event.py new file mode 100644 index 0000000..e05270e --- /dev/null +++ b/o2ims/service/event/registration_event.py @@ -0,0 +1,30 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Callable + +from o2ims.domain import events + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +def notify_registration_change( + event: events.RegistrationChanged, + publish: Callable, +): + logger.info('In notify_registration_change') + publish("RegistrationChanged", event) + logger.debug("published Registration Changed: {}".format( + event.id)) diff --git a/o2ims/views/ocloud_route.py b/o2ims/views/ocloud_route.py index c50907b..99adb5f 100644 --- a/o2ims/views/ocloud_route.py +++ b/o2ims/views/ocloud_route.py @@ -227,7 +227,7 @@ class RegistrationListRouter(Resource): @api_ims_inventory_v1.marshal_with(post_resp, code=201) def post(self): data = api_ims_inventory_v1.payload - result = ocloud_view.registration_create(data, bus.uow) + result = ocloud_view.registration_create(data, bus) return result, 201 diff --git a/o2ims/views/ocloud_view.py b/o2ims/views/ocloud_view.py index 386f8f5..ae8b204 100644 --- a/o2ims/views/ocloud_view.py +++ b/o2ims/views/ocloud_view.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import uuid +from datetime import datetime -from o2common.service import unit_of_work +from o2common.service import unit_of_work, messagebus +from o2ims.domain import events from o2ims.views.ocloud_dto import RegistrationDTO, SubscriptionDTO from o2ims.domain.subscription_obj import Registration, Subscription @@ -131,14 +134,21 @@ def registration_one(registrationId: str, def registration_create(registrationDto: RegistrationDTO.registration, - uow: unit_of_work.AbstractUnitOfWork): + bus: messagebus.MessageBus): reg_uuid = str(uuid.uuid4()) registration = Registration( reg_uuid, registrationDto['callback']) - with uow: + with bus.uow as uow: uow.registrations.add(registration) + logging.debug('before event length {}'.format( + len(registration.events))) + registration.events.append(events.RegistrationChanged( + reg_uuid, + datetime.now())) + logging.debug('after event length {}'.format(len(registration.events))) uow.commit() + _handle_events(bus) return {"registrationId": reg_uuid} @@ -148,3 +158,11 @@ def registration_delete(registrationId: str, uow.registrations.delete(registrationId) uow.commit() return True + + +def _handle_events(bus: messagebus.MessageBus): + # handle events + events = bus.uow.collect_new_events() + for event in events: + bus.handle(event) + return True -- 2.16.6