Add the command that registers to the SMO; Make the create registration and create... 72/7472/2
authorZhang Rong(Jon) <rong.zhang@windriver.com>
Fri, 24 Dec 2021 08:54:24 +0000 (16:54 +0800)
committerZhang Rong(Jon) <rong.zhang@windriver.com>
Mon, 27 Dec 2021 02:17:43 +0000 (10:17 +0800)
1. Add a command that registers to the SMO, it can base on a
   parameter to make a choice that it calls all SMO in the
   confiration or not
2. Create a registration event that can trigger the
   register action
3. Update the ocloud changed event that can trigger the
   register action
4. Redesign the Registration domain that updates column key
   to clarify the different status, includeing created,
   notified, and failed

Issue-ID: INF-249
Signed-off-by: Zhang Rong(Jon) <rong.zhang@windriver.com>
Change-Id: Ia734688b47c5125a3a1e1158d544f218ab741576

17 files changed:
docker-compose.yml
mock_smo/mock_smo/entrypoints/mock_smo.py
o2app/adapter/unit_of_work.py
o2app/entrypoints/redis_eventconsumer.py
o2app/service/handlers.py
o2common/domain/base.py
o2common/domain/events.py
o2ims/adapter/orm.py
o2ims/domain/commands.py
o2ims/domain/events.py
o2ims/domain/subscription_obj.py
o2ims/service/auditor/ocloud_handler.py
o2ims/service/command/notify_handler.py
o2ims/service/command/registration_handler.py [new file with mode: 0644]
o2ims/service/event/registration_event.py [new file with mode: 0644]
o2ims/views/ocloud_route.py
o2ims/views/ocloud_view.py

index 563ab4b..135537e 100644 (file)
@@ -11,6 +11,7 @@ services:
       - postgres
       - redis
     environment:
+      - API_HOST=api
       - DB_HOST=postgres
       - DB_PASSWORD=o2ims123
       - REDIS_HOST=redis
index f5c5895..aea0840 100644 (file)
@@ -81,6 +81,12 @@ def callback():
     return '', 202\r
 \r
 \r
+@app.route('/registration', methods=['POST'])\r
+def registration():\r
+    logger.info('Registration data: {}'.format(request.get_data()))\r
+    return '', 200\r
+\r
+\r
 def subscription_ims(url, consumerSubscriptionId):\r
     sub_key = r.get(REDIS_SUB_KEY)\r
     logger.info('Subscription key is {}'.format(sub_key))\r
index 1c9cae0..de48001 100644 (file)
@@ -90,6 +90,12 @@ class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
         for entry in self.deployment_managers.seen:\r
             while hasattr(entry, 'events') and len(entry.events) > 0:\r
                 yield entry.events.pop(0)\r
+        for entry in self.subscriptions.seen:\r
+            while hasattr(entry, 'events') and len(entry.events) > 0:\r
+                yield entry.events.pop(0)\r
+        for entry in self.registrations.seen:\r
+            while hasattr(entry, 'events') and len(entry.events) > 0:\r
+                yield entry.events.pop(0)\r
         for entry in self.nfdeployment_descs.seen:\r
             while hasattr(entry, 'events') and len(entry.events) > 0:\r
                 yield entry.events.pop(0)\r
index ea49edd..d4c7c65 100644 (file)
@@ -22,7 +22,7 @@ from o2dms.domain import commands
 from o2ims.domain import commands as imscmd
 
 from o2common.helper import o2logging
-from o2ims.domain.subscription_obj import Message2SMO
+from o2ims.domain.subscription_obj import Message2SMO, NotificationEventEnum, RegistrationMessage
 logger = o2logging.get_logger(__name__)
 
 r = redis.Redis(**config.get_redis_host_and_port())
@@ -36,6 +36,8 @@ def main():
     pubsub = r.pubsub(ignore_subscribe_messages=True)
     pubsub.subscribe("NfDeploymentStateChanged")
     pubsub.subscribe('ResourceChanged')
+    pubsub.subscribe('RegistrationChanged')
+    pubsub.subscribe('OcloudChanged')
 
     for m in pubsub.listen():
         try:
@@ -58,7 +60,7 @@ def handle_dms_changed(m, bus):
             ToState=data['ToState']
         )
         bus.handle(cmd)
-    if channel == 'ResourceChanged':
+    elif channel == 'ResourceChanged':
         datastr = m['data']
         data = json.loads(datastr)
         logger.info('ResourceChanged with cmd:{}'.format(data))
@@ -69,6 +71,19 @@ def handle_dms_changed(m, bus):
             eventtype=data['notificationEventType'],
             updatetime=data['updatetime']))
         bus.handle(cmd)
+    elif channel == 'RegistrationChanged':
+        datastr = m['data']
+        data = json.loads(datastr)
+        logger.info('RegistrationChanged with cmd:{}'.format(data))
+        cmd = imscmd.Register2SMO(data=RegistrationMessage(id=data['id']))
+        bus.handle(cmd)
+    elif channel == 'OcloudChanged':
+        datastr = m['data']
+        data = json.loads(datastr)
+        logger.info('OcloudChanged with cmd:{}'.format(data))
+        if data['notificationEventType'] == NotificationEventEnum.CREATE:
+            cmd = imscmd.Register2SMO(data=RegistrationMessage(is_all=True))
+            bus.handle(cmd)
     else:
         logger.info("unhandled:{}".format(channel))
 
index 833ef4e..deef1a4 100644 (file)
@@ -27,9 +27,9 @@ from o2ims.service.auditor import ocloud_handler, dms_handler, \
     resourcepool_handler, pserver_handler, pserver_cpu_handler, \
     pserver_mem_handler, pserver_port_handler, pserver_if_handler,\
     pserver_eth_handler
-from o2ims.service.command import notify_handler
-from o2ims.service.event import ocloud_event, \
-    resource_event, resource_pool_event
+from o2ims.service.command import notify_handler, registration_handler
+from o2ims.service.event import ocloud_event, resource_event, \
+    resource_pool_event, registration_event
 
 # if TYPE_CHECKING:
 #     from . import unit_of_work
@@ -55,6 +55,8 @@ EVENT_HANDLERS = {
     events.ResourceChanged: [resource_event.notify_resource_change],
     events.ResourcePoolChanged: [resource_pool_event.\
                                  notify_resourcepool_change],
+    events.RegistrationChanged: [registration_event.\
+                                 notify_registration_change],
 }  # type: Dict[Type[events.Event], Callable]
 
 
@@ -77,4 +79,5 @@ COMMAND_HANDLERS = {
     o2dms_cmmands.DeleteNfDeployment:
     nfdeployment_handler.delete_nfdeployment,
     commands.PubMessage2SMO: notify_handler.notify_change_to_smo,
+    commands.Register2SMO: registration_handler.registry_to_smo,
 }  # type: Dict[Type[commands.Command], Callable]
index 130fdad..d7c94cd 100644 (file)
@@ -16,6 +16,7 @@ from datetime import datetime
 from typing import List\r
 from sqlalchemy.inspection import inspect\r
 from sqlalchemy.exc import NoInspectionAvailable\r
+# from sqlalchemy.orm.exc import DetachedInstanceError\r
 from .events import Event\r
 \r
 \r
index 19d7e11..50d25e6 100644 (file)
@@ -15,6 +15,8 @@
 # pylint: disable=too-few-public-methods
 # from dataclasses import dataclass
 
+# from datetime import datetime
+
 
 class Event:
     pass
index 6b290a5..29fff79 100644 (file)
@@ -24,7 +24,7 @@ from sqlalchemy import (
     # Date,\r
     DateTime,\r
     ForeignKey,\r
-    Boolean,\r
+    Boolean,\r
     # engine,\r
     # event,\r
 )\r
@@ -153,7 +153,8 @@ registration = Table(
 \r
     Column("registrationId", String(255), primary_key=True),\r
     Column("callback", String(255)),\r
-    Column("notified", Boolean),\r
+    Column("status", String(255)),\r
+    Column("comments", String(255)),\r
 )\r
 \r
 \r
index 2b2aca6..bcd6e86 100644 (file)
@@ -19,7 +19,7 @@ from dataclasses import dataclass
 # from datetime import datetime
 # from o2ims.domain.resource_type import ResourceTypeEnum
 from o2ims.domain.stx_object import StxGenericModel
-from o2ims.domain.subscription_obj import Message2SMO
+from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
 from o2common.domain.commands import Command
 
 
@@ -33,6 +33,11 @@ class PubMessage2SMO(Command):
     data: Message2SMO
 
 
+@dataclass
+class Register2SMO(Command):
+    data: RegistrationMessage
+
+
 @dataclass
 class UpdateOCloud(UpdateStxObject):
     pass
index a4a2375..6f81a84 100644 (file)
@@ -15,6 +15,7 @@
 # pylint: disable=too-few-public-methods
 from dataclasses import dataclass
 from datetime import datetime
+
 from o2common.domain.events import Event
 from o2ims.domain.subscription_obj import NotificationEventEnum
 
@@ -45,3 +46,9 @@ class ResourceChanged(Event):
     resourcePoolId: str
     notificationEventType: NotificationEventEnum
     updatetime: datetime.now()
+
+
+@dataclass
+class RegistrationChanged(Event):
+    id: str
+    updatetime: datetime.now()
index 846bf95..ff8beaf 100644 (file)
@@ -30,14 +30,6 @@ class Subscription(AgRoot, Serializer):
         self.filter = filter
 
 
-class Registration(AgRoot, Serializer):
-    def __init__(self, id: str, url: str) -> None:
-        super().__init__()
-        self.registrationId = id
-        self.callback = url
-        self.notified = False
-
-
 class NotificationEventEnum(str, Enum):
     CREATE = 'CREATE'
     MODIFY = 'MODIFY'
@@ -53,6 +45,30 @@ class Message2SMO(Serializer):
         self.updatetime = updatetime
 
 
+class RegistrationStatusEnum(str, Enum):
+    CREATED = 'CREATED'
+    NOTIFIED = 'NOTIFIED'
+    FAILED = 'FAILED'
+
+
+class Registration(AgRoot, Serializer):
+    def __init__(self, id: str, url: str,
+                 status: RegistrationStatusEnum =
+                 RegistrationStatusEnum.CREATED,
+                 comments: str = '') -> None:
+        super().__init__()
+        self.registrationId = id
+        self.callback = url
+        self.status = status
+        self.comments = comments
+
+
+class RegistrationMessage(Serializer):
+    def __init__(self, is_all: bool = None, id: str = '') -> None:
+        self.all = is_all if is_all is not None else False
+        self.id = id
+
+
 @dataclass
 class EventState():
     Initial = 0
index 97cca3e..7548d58 100644 (file)
@@ -14,7 +14,7 @@
 
 # pylint: disable=unused-argument
 from __future__ import annotations
-from typing import Callable
+from typing import Callable
 
 # from dataclasses import asdict
 # from typing import List, Dict, Callable, Type
@@ -41,8 +41,7 @@ class InvalidResourceType(Exception):
 
 def update_ocloud(
     cmd: commands.UpdateOCloud,
-    uow: AbstractUnitOfWork,
-    publish: Callable
+    uow: AbstractUnitOfWork
 ):
     stxobj = cmd.data
     with uow:
index 3ece84a..9555623 100644 (file)
 #  limitations under the License.\r
 \r
 import json\r
-import redis\r
+import redis\r
 # import requests\r
 import http.client\r
 from urllib.parse import urlparse\r
 \r
-from o2common.config import config\r
+from o2common.config import config\r
 from o2common.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.domain import commands\r
 from o2ims.domain.subscription_obj import Subscription, Message2SMO\r
@@ -26,8 +26,8 @@ from o2ims.domain.subscription_obj import Subscription, Message2SMO
 from o2common.helper import o2logging\r
 logger = o2logging.get_logger(__name__)\r
 \r
-# Maybe another MQ server\r
-r = redis.Redis(**config.get_redis_host_and_port())\r
+# Maybe another MQ server\r
+r = redis.Redis(**config.get_redis_host_and_port())\r
 \r
 \r
 def notify_change_to_smo(\r
diff --git a/o2ims/service/command/registration_handler.py b/o2ims/service/command/registration_handler.py
new file mode 100644 (file)
index 0000000..0a4395d
--- /dev/null
@@ -0,0 +1,103 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+# import time\r
+import json\r
+# import asyncio\r
+# import requests\r
+import http.client\r
+from urllib.parse import urlparse\r
+from retry import retry\r
+\r
+from o2common.service.unit_of_work import AbstractUnitOfWork\r
+from o2common.config import config\r
+from o2ims.domain import commands\r
+from o2ims.domain.subscription_obj import RegistrationStatusEnum\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+\r
+def registry_to_smo(\r
+    cmd: commands.Register2SMO,\r
+    uow: AbstractUnitOfWork,\r
+):\r
+    logger.info('In registry_to_smo')\r
+    data = cmd.data\r
+    logger.info('The Register2SMO all is {}'.format(data.all))\r
+    if data.all:\r
+        regs = uow.registrations.list()\r
+        for reg in regs:\r
+            reg_data = reg.serialize()\r
+            logger.debug('Registration: {}'.format(reg_data['registrationId']))\r
+\r
+            register_smo(uow, reg_data)\r
+    else:\r
+        with uow:\r
+            reg = uow.registrations.get(data.id)\r
+            if reg is None:\r
+                return\r
+            logger.debug('Registration: {}'.format(reg.registrationId))\r
+            reg_data = reg.serialize()\r
+            register_smo(uow, reg_data)\r
+\r
+\r
+def register_smo(uow, reg_data):\r
+    call_res = call_smo(reg_data)\r
+    logger.debug('Call SMO response is {}'.format(call_res))\r
+    if call_res:\r
+        reg = uow.registrations.get(reg_data['registrationId'])\r
+        if reg is None:\r
+            return\r
+        reg.status = RegistrationStatusEnum.NOTIFIED\r
+        logger.debug('Updating Registration: {}'.format(\r
+            reg.registrationId))\r
+        uow.registrations.update(reg)\r
+        uow.commit()\r
+\r
+\r
+# def retry(fun, max_tries=2):\r
+#     for i in range(max_tries):\r
+#         try:\r
+#             time.sleep(5*i)\r
+#             # await asyncio.sleep(5*i)\r
+#             res = fun()\r
+#             logger.debug('retry function result: {}'.format(res))\r
+#             return res\r
+#         except Exception:\r
+#             continue\r
+\r
+\r
+@retry((ConnectionRefusedError), tries=2, delay=2)\r
+def call_smo(reg_data: dict):\r
+    callback_data = json.dumps({\r
+        'consumerSubscriptionId': reg_data['registrationId'],\r
+        'imsUrl': config.get_api_url()\r
+    })\r
+    logger.info('URL: {}, data: {}'.format(\r
+        reg_data['callback'], callback_data))\r
+\r
+    o = urlparse(reg_data['callback'])\r
+    conn = http.client.HTTPConnection(o.netloc)\r
+    headers = {'Content-type': 'application/json'}\r
+    conn.request('POST', o.path, callback_data, headers)\r
+    resp = conn.getresponse()\r
+    data = resp.read().decode('utf-8')\r
+    # json_data = json.loads(data)\r
+    if resp.status == 202 or resp.status == 200:\r
+        logger.info('Registrer to SMO successed, response code {} {}, data {}'.\r
+                    format(resp.status, resp.reason, data))\r
+        return True\r
+    logger.error('Response code is: {}'.format(resp.status))\r
+    return False\r
diff --git a/o2ims/service/event/registration_event.py b/o2ims/service/event/registration_event.py
new file mode 100644 (file)
index 0000000..e05270e
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+from typing import Callable\r
+\r
+from o2ims.domain import events\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+\r
+def notify_registration_change(\r
+    event: events.RegistrationChanged,\r
+    publish: Callable,\r
+):\r
+    logger.info('In notify_registration_change')\r
+    publish("RegistrationChanged", event)\r
+    logger.debug("published Registration Changed: {}".format(\r
+        event.id))\r
index c50907b..99adb5f 100644 (file)
@@ -227,7 +227,7 @@ class RegistrationListRouter(Resource):
     @api_ims_inventory_v1.marshal_with(post_resp, code=201)
     def post(self):
         data = api_ims_inventory_v1.payload
-        result = ocloud_view.registration_create(data, bus.uow)
+        result = ocloud_view.registration_create(data, bus)
         return result, 201
 
 
index 386f8f5..ae8b204 100644 (file)
 #  See the License for the specific language governing permissions and\r
 #  limitations under the License.\r
 \r
+import logging\r
 import uuid\r
+from datetime import datetime\r
 \r
-from o2common.service import unit_of_work\r
+from o2common.service import unit_of_work, messagebus\r
+from o2ims.domain import events\r
 from o2ims.views.ocloud_dto import RegistrationDTO, SubscriptionDTO\r
 from o2ims.domain.subscription_obj import Registration, Subscription\r
 \r
@@ -131,14 +134,21 @@ def registration_one(registrationId: str,
 \r
 \r
 def registration_create(registrationDto: RegistrationDTO.registration,\r
-                        uow: unit_of_work.AbstractUnitOfWork):\r
+                        bus: messagebus.MessageBus):\r
 \r
     reg_uuid = str(uuid.uuid4())\r
     registration = Registration(\r
         reg_uuid, registrationDto['callback'])\r
-    with uow:\r
+    with bus.uow as uow:\r
         uow.registrations.add(registration)\r
+        logging.debug('before event length {}'.format(\r
+            len(registration.events)))\r
+        registration.events.append(events.RegistrationChanged(\r
+            reg_uuid,\r
+            datetime.now()))\r
+        logging.debug('after event length {}'.format(len(registration.events)))\r
         uow.commit()\r
+    _handle_events(bus)\r
     return {"registrationId": reg_uuid}\r
 \r
 \r
@@ -148,3 +158,11 @@ def registration_delete(registrationId: str,
         uow.registrations.delete(registrationId)\r
         uow.commit()\r
     return True\r
+\r
+\r
+def _handle_events(bus: messagebus.MessageBus):\r
+    # handle events\r
+    events = bus.uow.collect_new_events()\r
+    for event in events:\r
+        bus.handle(event)\r
+    return True\r