Add subscription and notification for resource changes; fix a bug while pserver node... 08/7408/4
authorZhang Rong(Jon) <rong.zhang@windriver.com>
Thu, 16 Dec 2021 16:50:35 +0000 (00:50 +0800)
committerZhang Rong(Jon) <rong.zhang@windriver.com>
Mon, 20 Dec 2021 08:37:00 +0000 (16:37 +0800)
1. Trigger an event while resource changed, create a command handler to deal the event, let it can callback to SMO.
2. Create a mock SMO server with a simple html page to subscribe to O2IMS resource changing.
3. Fix a bug that when watch the pserver that it has an unavailable node.

Issue-ID: INF-238
Signed-off-by: Zhang Rong(Jon) <rong.zhang@windriver.com>
Change-Id: I13304656a721dbe5d4aec23200063e874eefa521

48 files changed:
Dockerfile.localtest
docker-compose.yml
mock_smo/Dockerfile [new file with mode: 0644]
mock_smo/etc/log.yaml [new file with mode: 0644]
mock_smo/mock_smo/__init__.py [new file with mode: 0644]
mock_smo/mock_smo/config.py [new file with mode: 0644]
mock_smo/mock_smo/entrypoints/__init__.py [new file with mode: 0644]
mock_smo/mock_smo/entrypoints/mock_smo.py [new file with mode: 0644]
mock_smo/mock_smo/logging.py [new file with mode: 0644]
mock_smo/o2app-mock-smo.sh [new file with mode: 0644]
mock_smo/requirements.txt [new file with mode: 0644]
mock_smo/setup.py [new file with mode: 0644]
o2app/adapter/unit_of_work.py
o2app/entrypoints/redis_eventconsumer.py
o2app/entrypoints/resource_watcher.py
o2app/service/handlers.py
o2common/adapter/redis_eventpublisher.py
o2common/domain/base.py
o2common/service/watcher/worker.py
o2ims/adapter/clients/ocloud_sa_client.py
o2ims/adapter/ocloud_repository.py
o2ims/adapter/orm.py
o2ims/domain/commands.py
o2ims/domain/events.py
o2ims/domain/ocloud.py
o2ims/domain/ocloud_repo.py
o2ims/domain/subscription_obj.py [new file with mode: 0644]
o2ims/domain/subscription_repo.py [new file with mode: 0644]
o2ims/service/auditor/ocloud_handler.py
o2ims/service/auditor/pserver_cpu_handler.py
o2ims/service/auditor/pserver_eth_handler.py
o2ims/service/auditor/pserver_handler.py
o2ims/service/auditor/pserver_if_handler.py
o2ims/service/auditor/pserver_mem_handler.py
o2ims/service/auditor/pserver_port_handler.py
o2ims/service/auditor/resourcepool_handler.py
o2ims/service/event/__init__.py [new file with mode: 0644]
o2ims/service/event/notify_handler.py [new file with mode: 0644]
o2ims/service/event/ocloud_event.py [new file with mode: 0644]
o2ims/service/event/resource_event.py [new file with mode: 0644]
o2ims/service/event/resource_pool_event.py [new file with mode: 0644]
o2ims/views/ocloud_view.py
requirements-test.txt
tests/integration/test_ocloud_repository.py
tests/integration/test_ocloud_view.py
tests/mock_smo/subscription.py [new file with mode: 0644]
tests/unit/test_ocloud.py
tests/unit/test_watcher.py

index eee30ef..0324e33 100644 (file)
@@ -52,6 +52,6 @@ RUN curl -O https://get.helm.sh/helm-v3.3.1-linux-amd64.tar.gz;
 RUN tar -zxvf helm-v3.3.1-linux-amd64.tar.gz; cp linux-amd64/helm /usr/local/bin\r
 \r
 RUN mkdir -p /etc/kubeconfig/\r
-COPY temp/kubeconfig/config /etc/kubeconfig/\r
+COPY temp/kubeconfig/config /etc/kubeconfig/\r
 \r
 WORKDIR /src\r
index 477441a..563ab4b 100644 (file)
@@ -93,6 +93,31 @@ services:
       - /bin/sh
       - /tests/o2app-watcher-entry.sh
 
+  mock_smo:
+    build:
+      context: ./mock_smo
+      dockerfile: Dockerfile
+    image: mock-smo
+    depends_on:
+      - mock_smo_redis
+    environment:
+      - API_HOST=api
+      - REDIS_HOST=mock_smo_redis
+      - MOCK_SMO_HOST=mock_smo
+      - PYTHONDONTWRITEBYTECODE=1
+      - FLASK_APP=/mock_smo/entrypoints/mock_smo.py
+      - FLASK_DEBUG=1
+      - PYTHONUNBUFFERED=1
+      - LOGGING_CONFIG_LEVEL=DEBUG
+    volumes:
+      - ./mock_smo/etc:/tmp/etc
+      - ./mock_smo/mock_smo:/mock_smo
+    entrypoint:
+      - /bin/sh
+      - /src/o2app-mock-smo.sh
+    ports:
+      - "5001:80"
+
   postgres:
     image: postgres:9.6
     environment:
@@ -105,3 +130,8 @@ services:
     image: redis:alpine
     ports:
       - "63791:6379"
+
+  mock_smo_redis:
+    image: redis:alpine
+    ports:
+      - "63792:6379"
diff --git a/mock_smo/Dockerfile b/mock_smo/Dockerfile
new file mode 100644 (file)
index 0000000..74d6b61
--- /dev/null
@@ -0,0 +1,23 @@
+FROM python:3.10-slim-buster\r
+\r
+RUN apt-get update; apt-get install -y git gcc\r
+\r
+COPY requirements.txt /tmp/\r
+RUN  pip install -r /tmp/requirements.txt \r
+\r
+# COPY requirements-test.txt /tmp/\r
+# RUN pip install -r /tmp/requirements-test.txt\r
+\r
+RUN mkdir -p /src\r
+COPY mock_smo/ /src/mock_smo/\r
+\r
+COPY setup.py o2app-mock-smo.sh /src/\r
+RUN pip install -e /src\r
+\r
+COPY etc/ /etc/mock_smo/\r
+\r
+# COPY tests/ /tests/\r
+\r
+# RUN apt-get install -y procps vim\r
+\r
+WORKDIR /src\r
diff --git a/mock_smo/etc/log.yaml b/mock_smo/etc/log.yaml
new file mode 100644 (file)
index 0000000..59ab8eb
--- /dev/null
@@ -0,0 +1,49 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+version: 1\r
+disable_existing_loggers: False\r
+\r
+loggers:\r
+    root:\r
+      handlers: [console_handler, file_handler]\r
+      level: "WARNING"\r
+      propagate: False\r
+    o2common:\r
+      handlers: [console_handler, file_handler]\r
+      level: "WARNING"\r
+      propagate: False\r
+    o2ims:\r
+      handlers: [console_handler, file_handler]\r
+      level: "DEBUG"\r
+      propagate: False\r
+    o2dms:\r
+      handlers: [console_handler, file_handler]\r
+      level: "DEBUG"\r
+      propagate: False\r
+handlers:\r
+    console_handler:\r
+      level: "DEBUG"\r
+      class: "logging.StreamHandler"\r
+      formatter: "standard"\r
+    file_handler:\r
+      level: "DEBUG"\r
+      class: "logging.handlers.RotatingFileHandler"\r
+      filename: "/var/log/mock_smo.log"\r
+      formatter: "standard"\r
+      maxBytes: 52428800\r
+      backupCount: 10\r
+formatters:\r
+    standard:\r
+      format: "%(asctime)s:[%(name)s]:[%(filename)s]-[%(lineno)d] [%(levelname)s]:%(message)s"\r
diff --git a/mock_smo/mock_smo/__init__.py b/mock_smo/mock_smo/__init__.py
new file mode 100644 (file)
index 0000000..b514342
--- /dev/null
@@ -0,0 +1,13 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
diff --git a/mock_smo/mock_smo/config.py b/mock_smo/mock_smo/config.py
new file mode 100644 (file)
index 0000000..ab5d607
--- /dev/null
@@ -0,0 +1,48 @@
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import os
+
+import mock_smo.logging as logging
+logger = logging.get_logger(__name__)
+
+
+def get_mock_smo_api_url():
+    host = os.environ.get("API_HOST", "localhost")
+    port = 5001 if host == "localhost" else 80
+    return f"http://{host}:{port}"
+
+
+def get_root_api_base():
+    return "/"
+
+
+def get_o2ims_api_base():
+    return get_root_api_base() + 'o2ims_infrastructureInventory/v1'
+
+
+def get_o2dms_api_base():
+    return get_root_api_base() + "o2dms"
+
+
+def get_redis_host_and_port():
+    host = os.environ.get("REDIS_HOST", "localhost")
+    port = 63792 if host == "localhost" else 6379
+    return dict(host=host, port=port)
+
+
+def get_smo_o2endpoint():
+    smo_o2endpoint = os.environ.get(
+        "SMO_O2_ENDPOINT", "http://localhost/smo_sim")
+    return smo_o2endpoint
diff --git a/mock_smo/mock_smo/entrypoints/__init__.py b/mock_smo/mock_smo/entrypoints/__init__.py
new file mode 100644 (file)
index 0000000..b514342
--- /dev/null
@@ -0,0 +1,13 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
diff --git a/mock_smo/mock_smo/entrypoints/mock_smo.py b/mock_smo/mock_smo/entrypoints/mock_smo.py
new file mode 100644 (file)
index 0000000..f5c5895
--- /dev/null
@@ -0,0 +1,116 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+import json\r
+import redis\r
+import http.client\r
+from flask import Flask, request\r
+from flask.helpers import url_for\r
+\r
+import mock_smo.config as config\r
+import mock_smo.logging as logging\r
+logger = logging.get_logger(__name__)\r
+\r
+apibase = config.get_o2ims_api_base()\r
+app = Flask(__name__)\r
+\r
+r = redis.Redis(**config.get_redis_host_and_port())\r
+REDIS_SUB_KEY = 'mock_smo_sub_key'\r
+REDIS_O2IMS_URL = 'mock_smo_o2ims_url'\r
+\r
+\r
+@app.route('/', methods=['GET', 'POST'])\r
+def index():\r
+    if request.method == 'POST':\r
+        url = request.form['url']\r
+        consumerSubscriptionId = request.form['consumerSubId']\r
+        sub_id = subscription_ims(url, consumerSubscriptionId)\r
+        return """\r
+<h1>Subscribed O2IMS</h1>\r
+<h3>Subscription ID: %s</h3>\r
+<h3>Subscribed O2IMS URL: %s</h3>\r
+<a href="%s">\r
+   <input type="button" value="Unsubscription" />\r
+</a>\r
+""" % (sub_id, url, url_for('unsubscription'))\r
+    return """\r
+<h1>Subscribe O2IMS</h1>\r
+<form method="POST">\r
+    <label for="url">O2 IMS URL: </label>\r
+    <input type="text" id="url" name="url" value="api:80"></br></br>\r
+    <label for="consumerSubId">Consumer Sub ID: </label>\r
+    <input type="text" id="consumerSubId" name="consumerSubId"></br></br>\r
+    <input type="submit" value="Submit">\r
+</form>\r
+"""\r
+\r
+\r
+@app.route('/unsubscription')\r
+def unsubscription():\r
+    sub_key = r.get(REDIS_SUB_KEY)\r
+    logger.info('Subscription key is {}'.format(sub_key))\r
+    if sub_key is None:\r
+        return '<h1>Already unsubscribed</h1>'\r
+    url = r.get(REDIS_O2IMS_URL).decode('utf-8')\r
+    logger.info('O2 IMS API is: {}'.format(url))\r
+    unsubscription_ims(url, sub_key.decode('utf-8'))\r
+    r.delete(REDIS_O2IMS_URL)\r
+    r.delete(REDIS_SUB_KEY)\r
+    return """\r
+<h1>Unsubscribed O2IMS</h1>\r
+<a href="/">\r
+   <input type="button" value="Go Back" />\r
+</a>\r
+"""\r
+\r
+\r
+@app.route('/callback', methods=['POST'])\r
+def callback():\r
+    logger.info('Callback data: {}'.format(request.get_data()))\r
+    return '', 202\r
+\r
+\r
+def subscription_ims(url, consumerSubscriptionId):\r
+    sub_key = r.get(REDIS_SUB_KEY)\r
+    logger.info('Subscription key is {}'.format(sub_key))\r
+    if sub_key is not None:\r
+        return sub_key.decode('utf-8')\r
+\r
+    logger.info(request.host_url)\r
+    conn = http.client.HTTPConnection(url)\r
+    headers = {'Content-type': 'application/json'}\r
+    post_val = {\r
+        'callback': 'http://mock_smo:80' + url_for('callback'),\r
+        'consumerSubscriptionId': consumerSubscriptionId,\r
+        'filter': '["pserver"]'  # '["pserver","pserver_mem"]'\r
+    }\r
+    json_val = json.dumps(post_val)\r
+    conn.request('POST', apibase+'/subscriptions', json_val, headers)\r
+    resp = conn.getresponse()\r
+    data = resp.read().decode('utf-8')\r
+    logger.info('Subscription response: {} {}, data: {}'.format(\r
+        resp.status, resp.reason, data))\r
+    json_data = json.loads(data)\r
+\r
+    r.set(REDIS_SUB_KEY, json_data['subscriptionId'])\r
+    r.set(REDIS_O2IMS_URL, url)\r
+    return json_data['subscriptionId']\r
+\r
+\r
+def unsubscription_ims(url, subId):\r
+    conn = http.client.HTTPConnection(url)\r
+    conn.request('DELETE', apibase + '/subscriptions/' + subId)\r
+    resp = conn.getresponse()\r
+    logger.info('Unsubscription response: {} {}'.format(\r
+        resp.status, resp.reason))\r
diff --git a/mock_smo/mock_smo/logging.py b/mock_smo/mock_smo/logging.py
new file mode 100644 (file)
index 0000000..cb3b504
--- /dev/null
@@ -0,0 +1,36 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+import logging\r
+import logging.config\r
+import logging.handlers\r
+import os\r
+import yaml\r
+\r
+\r
+def get_logger(name=None):\r
+    CONFIG_FILE = os.environ.get(\r
+        "LOGGING_CONFIG_FILE", "/etc/mock_smo/log.yaml")\r
+    if os.path.exists(CONFIG_FILE):\r
+        with open(file=CONFIG_FILE, mode='r', encoding="utf-8") as file:\r
+            config_yaml = yaml.load(stream=file, Loader=yaml.FullLoader)\r
+        logging.config.dictConfig(config=config_yaml)\r
+\r
+    logger = logging.getLogger(name)\r
+\r
+    # override logging level\r
+    LOGGING_CONFIG_LEVEL = os.environ.get("LOGGING_CONFIG_LEVEL", None)\r
+    if LOGGING_CONFIG_LEVEL:\r
+        logger.setLevel(LOGGING_CONFIG_LEVEL)\r
+    return logger\r
diff --git a/mock_smo/o2app-mock-smo.sh b/mock_smo/o2app-mock-smo.sh
new file mode 100644 (file)
index 0000000..137491b
--- /dev/null
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+mkdir -p /etc/mock_smo
+cp -r /tmp/etc/* /etc/mock_smo/
+mkdir -p /src/mock_smo
+cp -r /mock_smo/* /src/mock_smo
+
+# cp -r requirements.txt /src/requirements.txt
+
+pip install -e /src
+
+export FLASK_APP=/mock_smo/entrypoints/mock_smo.py
+flask run --host=0.0.0.0 --port=80
diff --git a/mock_smo/requirements.txt b/mock_smo/requirements.txt
new file mode 100644 (file)
index 0000000..0b2acc6
--- /dev/null
@@ -0,0 +1,5 @@
+flask\r
+flask-restx\r
+redis\r
+PyYAML>=5.4.1\r
+\r
diff --git a/mock_smo/setup.py b/mock_smo/setup.py
new file mode 100644 (file)
index 0000000..3afc7fc
--- /dev/null
@@ -0,0 +1,13 @@
+from setuptools import setup\r
+from setuptools import find_packages\r
+\r
+setup(\r
+    name="mock_smo",\r
+    version="1.0",\r
+    packages=find_packages(),\r
+    license="LICENSE",\r
+    description="Mock SMO server for O2 IMS and DMS",\r
+    install_requires=[\r
+        'httplib2',\r
+    ]\r
+)\r
index 6fd4ae2..facfc45 100644 (file)
@@ -73,13 +73,14 @@ class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
 \r
     def _collect_new_events(self):\r
         for entry in self.oclouds.seen:\r
-            while hasattr(entry, 'events') and len(entry.events) > 0:\r
+            # while hasattr(entry, 'events') and len(entry.events) > 0:\r
+            while entry.events is not None and len(entry.events) > 0:\r
                 yield entry.events.pop(0)\r
         for entry in self.resource_pools.seen:\r
-            while hasattr(entry, 'events') and len(entry.events) > 0:\r
+            while entry.events is not None and len(entry.events) > 0:\r
                 yield entry.events.pop(0)\r
         for entry in self.resources.seen:\r
-            while hasattr(entry, 'events') and len(entry.events) > 0:\r
+            while entry.events is not None and len(entry.events) > 0:\r
                 yield entry.events.pop(0)\r
         for entry in self.resource_types.seen:\r
             while hasattr(entry, 'events') and len(entry.events) > 0:\r
index 8472949..ea49edd 100644 (file)
 #  limitations under the License.
 
 # import json
-from logging import log
 import redis
 import json
 from o2app import bootstrap
 from o2common.config import config
 # from o2common.domain import commands
 from o2dms.domain import commands
-from o2dms.domain import events
+from o2ims.domain import commands as imscmd
 
 from o2common.helper import o2logging
+from o2ims.domain.subscription_obj import Message2SMO
 logger = o2logging.get_logger(__name__)
 
 r = redis.Redis(**config.get_redis_host_and_port())
 
+apibase = config.get_o2ims_api_base()
+
 
 def main():
     logger.info("Redis pubsub starting")
     bus = bootstrap.bootstrap()
     pubsub = r.pubsub(ignore_subscribe_messages=True)
     pubsub.subscribe("NfDeploymentStateChanged")
+    pubsub.subscribe('ResourceChanged')
 
     for m in pubsub.listen():
         try:
@@ -50,11 +53,22 @@ def handle_dms_changed(m, bus):
         data = json.loads(datastr)
         logger.info('HandleNfDeploymentStateChanged with cmd:{}'.format(data))
         cmd = commands.HandleNfDeploymentStateChanged(
-            NfDeploymentId = data['NfDeploymentId'],
-            FromState = data['FromState'],
-            ToState = data['ToState']
+            NfDeploymentId=data['NfDeploymentId'],
+            FromState=data['FromState'],
+            ToState=data['ToState']
         )
         bus.handle(cmd)
+    if channel == 'ResourceChanged':
+        datastr = m['data']
+        data = json.loads(datastr)
+        logger.info('ResourceChanged with cmd:{}'.format(data))
+        ref = apibase + '/resourcePools/' + data['resourcePoolId'] +\
+            '/resources/' + data['id']
+        cmd = imscmd.PubMessage2SMO(data=Message2SMO(
+            id=data['id'], ref=ref,
+            eventtype=data['notificationEventType'],
+            updatetime=data['updatetime']))
+        bus.handle(cmd)
     else:
         logger.info("unhandled:{}".format(channel))
 
index a7bf4b3..38308eb 100644 (file)
@@ -54,7 +54,7 @@ class WatcherService(cotyledon.Service):
         super().__init__(worker_id)\r
         self.args = args\r
         self.bus = bootstrap.bootstrap()\r
-        self.worker = PollWorker()\r
+        self.worker = PollWorker(bus=self.bus)\r
 \r
     def run(self):\r
         try:\r
index ebbf9c0..0665cf2 100644 (file)
@@ -27,6 +27,8 @@ from o2ims.service.auditor import ocloud_handler, dms_handler, \
     resourcepool_handler, pserver_handler, pserver_cpu_handler, \
     pserver_mem_handler, pserver_port_handler, pserver_if_handler,\
     pserver_eth_handler
+from o2ims.service.event import notify_handler, ocloud_event, \
+    resource_event, resource_pool_event
 
 # if TYPE_CHECKING:
 #     from . import unit_of_work
@@ -39,7 +41,7 @@ class InvalidResourceType(Exception):
 EVENT_HANDLERS = {
     o2dms_events.NfDeploymentStateChanged: [
         nfdeployment_handler.publish_nfdeployment_state_change
-    ]
+    ],
     # o2dms_events.NfDeploymentCreated: [
     #     nfdeployment_handler.publish_nfdeployment_created],
     # o2dms_events.NfDeploymentInstalled: [
@@ -48,7 +50,11 @@ EVENT_HANDLERS = {
     #     nfdeployment_handler.publish_nfdeployment_uninstalling],
     # o2dms_events.NfDeploymentUninstalled: [
     #     nfdeployment_handler.publish_nfdeployment_uninstalled]
-} # type: Dict[Type[events.Event], Callable]
+    events.OcloudChanged: [ocloud_event.notify_ocloud_update],
+    events.ResourceChanged: [resource_event.notify_resource_change],
+    events.ResourcePoolChanged: [resource_pool_event.\
+                                 notify_resourcepool_change],
+}  # type: Dict[Type[events.Event], Callable]
 
 
 COMMAND_HANDLERS = {
@@ -61,13 +67,13 @@ COMMAND_HANDLERS = {
     commands.UpdatePserverIf: pserver_if_handler.update_pserver_if,
     commands.UpdatePserverIfPort: pserver_port_handler.update_pserver_port,
     commands.UpdatePserverEth: pserver_eth_handler.update_pserver_eth,
-
-    o2dms_cmmands.HandleNfDeploymentStateChanged: \
-        nfdeployment_handler.handle_nfdeployment_statechanged,
-    o2dms_cmmands.InstallNfDeployment: \
-        nfdeployment_handler.install_nfdeployment,
-    o2dms_cmmands.UninstallNfDeployment: \
-        nfdeployment_handler.uninstall_nfdeployment,
-    o2dms_cmmands.DeleteNfDeployment: \
-        nfdeployment_handler.delete_nfdeployment,
+    o2dms_cmmands.HandleNfDeploymentStateChanged:
+    nfdeployment_handler.handle_nfdeployment_statechanged,
+    o2dms_cmmands.InstallNfDeployment:
+    nfdeployment_handler.install_nfdeployment,
+    o2dms_cmmands.UninstallNfDeployment:
+    nfdeployment_handler.uninstall_nfdeployment,
+    o2dms_cmmands.DeleteNfDeployment:
+    nfdeployment_handler.delete_nfdeployment,
+    commands.PubMessage2SMO: notify_handler.notify_change_to_smo,
 }  # type: Dict[Type[commands.Command], Callable]
index 9fac313..e128ef7 100644 (file)
@@ -28,4 +28,4 @@ r = redis.Redis(**config.get_redis_host_and_port())
 
 def publish(channel, event: events.Event):
     logger.info("publishing: channel=%s, event=%s", channel, event)
-    r.publish(channel, json.dumps(asdict(event)))
+    r.publish(channel, json.dumps(asdict(event), default=str))
index 63d1659..130fdad 100644 (file)
 from datetime import datetime\r
 from typing import List\r
 from sqlalchemy.inspection import inspect\r
+from sqlalchemy.exc import NoInspectionAvailable\r
 from .events import Event\r
 \r
 \r
 class AgRoot:\r
 \r
+    events = []\r
+\r
     def __init__(self) -> None:\r
         self.hash = ""\r
         self.updatetime = datetime.now()\r
@@ -27,17 +30,23 @@ class AgRoot:
         self.events = []  # type: List[Event]\r
         # self.id = ""\r
 \r
+    # def append_event(self, event: Event):\r
+    #     self.events = self.events.append(event)\r
+\r
 \r
 class Serializer(object):\r
 \r
     def serialize(self):\r
-        # d = {c: getattr(self, c) for c in inspect(self).attrs.keys()}\r
-        # if 'createtime' in d:\r
-        #     d['createtime'] = d['createtime'].isoformat()\r
-        # if 'updatetime' in d:\r
-        #     d['updatetime'] = d['updatetime'].isoformat()\r
-        # return d\r
-        return {c: getattr(self, c) for c in inspect(self).attrs.keys()}\r
+        try:\r
+            # d = {c: getattr(self, c) for c in inspect(self).attrs.keys()}\r
+            # if 'createtime' in d:\r
+            #     d['createtime'] = d['createtime'].isoformat()\r
+            # if 'updatetime' in d:\r
+            #     d['updatetime'] = d['updatetime'].isoformat()\r
+            # return d\r
+            return {c: getattr(self, c) for c in inspect(self).attrs.keys()}\r
+        except NoInspectionAvailable:\r
+            return self.__dict__\r
 \r
     @staticmethod\r
     def serialize_list(li):\r
index 64d189d..dbc67b7 100644 (file)
@@ -14,6 +14,7 @@
 \r
 import time\r
 import sched\r
+# from o2common.service.unit_of_work import AbstractUnitOfWork\r
 from o2common.service.watcher.base import WatcherTree\r
 \r
 from o2common.helper import o2logging\r
@@ -21,12 +22,13 @@ logger = o2logging.get_logger(__name__)
 \r
 \r
 class PollWorker(object):\r
-    def __init__(self, interval=10) -> None:\r
+    def __init__(self, interval=10, bus=None) -> None:\r
         super().__init__()\r
         self.watchers = []\r
         self.schedinstance = sched.scheduler(time.time, time.sleep)\r
         self.schedinterval = interval\r
         self._stopped = True\r
+        self._bus = bus\r
 \r
     def set_interval(self, interval):\r
         if interval > 0:\r
@@ -48,6 +50,13 @@ class PollWorker(object):
             except Exception as ex:\r
                 logger.warning("Worker raises exception:" + str(ex))\r
                 continue\r
+\r
+        # handle events\r
+        if self._bus is not None:\r
+            events = self._bus.uow.collect_new_events()\r
+            for event in events:\r
+                self._bus.handle(event)\r
+\r
         self.schedinstance.enter(self.schedinterval, 1, self._repeat)\r
 \r
     # note the sched run will block current thread\r
index 97592d3..d0e12de 100644 (file)
@@ -161,7 +161,7 @@ class StxSaClientImp(object):
         logger.debug('host 1:' + str(hosts[0].to_dict()))\r
         return [ocloudModel.StxGenericModel(\r
             ResourceTypeEnum.PSERVER, self._hostconverter(host))\r
-            for host in hosts if host]\r
+            for host in hosts if host and host.availability == 'available']\r
 \r
     def getPserver(self, id) -> ocloudModel.StxGenericModel:\r
         host = self.stxclient.ihost.get(id)\r
index ab1d5c4..4880fa1 100644 (file)
 
 from typing import List
 
-from o2ims.domain import ocloud
+from o2ims.domain import ocloud, subscription_obj
 from o2ims.domain.ocloud_repo import OcloudRepository, ResourceTypeRepository,\
-    ResourcePoolRepository, ResourceRepository, DeploymentManagerRepository,\
-    SubscriptionRepository
+    ResourcePoolRepository, ResourceRepository, DeploymentManagerRepository
+from o2ims.domain.subscription_repo import SubscriptionRepository
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
 
@@ -143,19 +143,19 @@ class SubscriptionSqlAlchemyRepository(SubscriptionRepository):
         super().__init__()
         self.session = session
 
-    def _add(self, subscription: ocloud.Subscription):
+    def _add(self, subscription: subscription_obj.Subscription):
         self.session.add(subscription)
 
-    def _get(self, subscription_id) -> ocloud.Subscription:
-        return self.session.query(ocloud.Subscription).filter_by(
+    def _get(self, subscription_id) -> subscription_obj.Subscription:
+        return self.session.query(subscription_obj.Subscription).filter_by(
             subscriptionId=subscription_id).first()
 
-    def _list(self) -> List[ocloud.Subscription]:
-        return self.session.query(ocloud.Subscription)
+    def _list(self) -> List[subscription_obj.Subscription]:
+        return self.session.query(subscription_obj.Subscription)
 
-    def _update(self, subscription: ocloud.Subscription):
+    def _update(self, subscription: subscription_obj.Subscription):
         self.session.add(subscription)
 
     def _delete(self, subscription_id):
-        self.session.query(ocloud.Subscription).filter_by(
+        self.session.query(subscription_obj.Subscription).filter_by(
             subscriptionId=subscription_id).delete()
index 7ad7a20..bb5c984 100644 (file)
@@ -32,6 +32,7 @@ from sqlalchemy.orm import mapper, relationship
 # from sqlalchemy.sql.sqltypes import Integer\r
 \r
 from o2ims.domain import ocloud as ocloudModel\r
+from o2ims.domain import subscription_obj as subModel\r
 from o2ims.domain.resource_type import ResourceTypeEnum\r
 \r
 from o2common.helper import o2logging\r
@@ -166,7 +167,7 @@ def start_o2ims_mappers(engine=None):
             "resourcePools": relationship(resourcepool_mapper)\r
         }\r
     )\r
-    mapper(ocloudModel.Subscription, subscription)\r
+    mapper(subModel.Subscription, subscription)\r
 \r
     if engine is not None:\r
         metadata.create_all(engine)\r
index 657b48f..2b2aca6 100644 (file)
@@ -19,6 +19,7 @@ from dataclasses import dataclass
 # from datetime import datetime
 # from o2ims.domain.resource_type import ResourceTypeEnum
 from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import Message2SMO
 from o2common.domain.commands import Command
 
 
@@ -27,6 +28,11 @@ class UpdateStxObject(Command):
     data: StxGenericModel
 
 
+@dataclass
+class PubMessage2SMO(Command):
+    data: Message2SMO
+
+
 @dataclass
 class UpdateOCloud(UpdateStxObject):
     pass
index 7ab04a0..a4a2375 100644 (file)
 
 # pylint: disable=too-few-public-methods
 from dataclasses import dataclass
+from datetime import datetime
 from o2common.domain.events import Event
+from o2ims.domain.subscription_obj import NotificationEventEnum
 
 
 @dataclass
-class OcloudUpdated(Event):
-    oCloudId: str
+class OcloudChanged(Event):
+    id: str
+    notificationEventType: NotificationEventEnum
+    updatetime: datetime.now()
 
 
 @dataclass
-class ResourceTypeUpdated(Event):
-    oCloudId: str
+class ResourceTypeChanged(Event):
+    id: str
+    updatetime: datetime.now()
+
+
+@dataclass
+class ResourcePoolChanged(Event):
+    id: str
+    notificationEventType: NotificationEventEnum
+    updatetime: datetime.now()
+
+
+@dataclass
+class ResourceChanged(Event):
+    id: str
+    resourcePoolId: str
+    notificationEventType: NotificationEventEnum
+    updatetime: datetime.now()
index e3584ed..1323134 100644 (file)
@@ -23,17 +23,6 @@ from .resource_type import ResourceTypeEnum
 # from uuid import UUID\r
 \r
 \r
-class Subscription(AgRoot, Serializer):\r
-    def __init__(self, id: str, callback: str, consumersubid: str = '',\r
-                 filter: str = '') -> None:\r
-        super().__init__()\r
-        self.subscriptionId = id\r
-        self.version_number = 0\r
-        self.callback = callback\r
-        self.consumerSubscriptionId = consumersubid\r
-        self.filter = filter\r
-\r
-\r
 class DeploymentManager(AgRoot, Serializer):\r
     def __init__(self, id: str, name: str, ocloudid: str,\r
                  dmsendpoint: str, description: str = '',\r
index b224a68..c513eed 100644 (file)
@@ -187,43 +187,3 @@ class DeploymentManagerRepository(abc.ABC):
     @abc.abstractmethod\r
     def _update(self, deployment_manager: ocloud.DeploymentManager):\r
         raise NotImplementedError\r
-\r
-\r
-class SubscriptionRepository(abc.ABC):\r
-    def __init__(self):\r
-        self.seen = set()  # type: Set[ocloud.Subscription]\r
-\r
-    def add(self, subscription: ocloud.Subscription):\r
-        self._add(subscription)\r
-        self.seen.add(subscription)\r
-\r
-    def get(self, subscription_id) -> ocloud.Subscription:\r
-        subscription = self._get(subscription_id)\r
-        if subscription:\r
-            self.seen.add(subscription)\r
-        return subscription\r
-\r
-    def list(self) -> List[ocloud.Subscription]:\r
-        return self._list()\r
-\r
-    def update(self, subscription: ocloud.Subscription):\r
-        self._update(subscription)\r
-\r
-    def delete(self, subscription_id):\r
-        self._delete(subscription_id)\r
-\r
-    @abc.abstractmethod\r
-    def _add(self, subscription: ocloud.Subscription):\r
-        raise NotImplementedError\r
-\r
-    @abc.abstractmethod\r
-    def _get(self, subscription_id) -> ocloud.Subscription:\r
-        raise NotImplementedError\r
-\r
-    @abc.abstractmethod\r
-    def _update(self, subscription: ocloud.Subscription):\r
-        raise NotImplementedError\r
-\r
-    @abc.abstractmethod\r
-    def _delete(self, subscription_id):\r
-        raise NotImplementedError\r
diff --git a/o2ims/domain/subscription_obj.py b/o2ims/domain/subscription_obj.py
new file mode 100644 (file)
index 0000000..dc3145f
--- /dev/null
@@ -0,0 +1,57 @@
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from __future__ import annotations
+from enum import Enum
+from dataclasses import dataclass
+
+from o2common.domain.base import AgRoot, Serializer
+
+
+class Subscription(AgRoot, Serializer):
+    def __init__(self, id: str, callback: str, consumersubid: str = '',
+                 filter: str = '') -> None:
+        super().__init__()
+        self.subscriptionId = id
+        self.version_number = 0
+        self.callback = callback
+        self.consumerSubscriptionId = consumersubid
+        self.filter = filter
+
+
+class NotificationEventEnum(str, Enum):
+    CREATE = 'CREATE'
+    MODIFY = 'MODIFY'
+    DELETE = 'DELETE'
+
+
+class Message2SMO(Serializer):
+    def __init__(self, eventtype: NotificationEventEnum,
+                 id: str, ref: str, updatetime: str) -> None:
+        self.notificationEventType = eventtype
+        self.objectRef = ref
+        self.id = id
+        self.updatetime = updatetime
+
+
+@dataclass
+class EventState():
+    Initial = 0
+    NotInstalled = 1
+    Installing = 2
+    Installed = 3
+    Updating = 4
+    Uninstalling = 5
+    Abnormal = 6
+    Deleted = 7
diff --git a/o2ims/domain/subscription_repo.py b/o2ims/domain/subscription_repo.py
new file mode 100644 (file)
index 0000000..d12c00d
--- /dev/null
@@ -0,0 +1,57 @@
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import abc
+from typing import List, Set
+from o2ims.domain import subscription_obj as subobj
+
+
+class SubscriptionRepository(abc.ABC):
+    def __init__(self):
+        self.seen = set()  # type: Set[subobj.Subscription]
+
+    def add(self, subscription: subobj.Subscription):
+        self._add(subscription)
+        self.seen.add(subscription)
+
+    def get(self, subscription_id) -> subobj.Subscription:
+        subscription = self._get(subscription_id)
+        if subscription:
+            self.seen.add(subscription)
+        return subscription
+
+    def list(self) -> List[subobj.Subscription]:
+        return self._list()
+
+    def update(self, subscription: subobj.Subscription):
+        self._update(subscription)
+
+    def delete(self, subscription_id):
+        self._delete(subscription_id)
+
+    @abc.abstractmethod
+    def _add(self, subscription: subobj.Subscription):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def _get(self, subscription_id) -> subobj.Subscription:
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def _update(self, subscription: subobj.Subscription):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def _delete(self, subscription_id):
+        raise NotImplementedError
index c0d8eaf..97cca3e 100644 (file)
 
 # pylint: disable=unused-argument
 from __future__ import annotations
+from typing import Callable
 
-from o2ims.domain.stx_object import StxGenericModel
 # from dataclasses import asdict
 # from typing import List, Dict, Callable, Type
 # TYPE_CHECKING
-from o2ims.domain import commands
+
+from o2common.config import config
+# from o2common.service.messagebus import MessageBus
 from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import InvalidOcloudState
-from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain import events, commands
 from o2ims.domain.ocloud import Ocloud
-from o2common.config import config
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.resource_type import InvalidOcloudState, MismatchedModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
 # if TYPE_CHECKING:
 #     from . import unit_of_work
 
@@ -38,7 +41,8 @@ class InvalidResourceType(Exception):
 
 def update_ocloud(
     cmd: commands.UpdateOCloud,
-    uow: AbstractUnitOfWork
+    uow: AbstractUnitOfWork,
+    publish: Callable
 ):
     stxobj = cmd.data
     with uow:
@@ -99,4 +103,8 @@ def update_by(ocloud: Ocloud, stxobj: StxGenericModel) -> None:
     # ocloud.content = stxobj.content
     ocloud.hash = stxobj.hash
     ocloud.version_number = ocloud.version_number + 1
-    ocloud.events = []
+    ocloud.events.append(events.OcloudChanged(
+        id=stxobj.id,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
index 76141bc..cac4690 100644 (file)
@@ -17,11 +17,12 @@ from __future__ import annotations
 import uuid
 # import json
 
-from o2ims.domain import commands
+from o2ims.domain import commands, events
 from o2ims.domain.stx_object import StxGenericModel
 from o2common.service.unit_of_work import AbstractUnitOfWork
 from o2ims.domain.resource_type import MismatchedModel
 from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain.subscription_obj import NotificationEventEnum
 
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
@@ -114,4 +115,9 @@ def update_by(target: Resource, stxobj: StxGenericModel,
     target.updatetime = stxobj.updatetime
     target.hash = stxobj.hash
     target.version_number = target.version_number + 1
-    target.events = []
+    target.events.append(events.ResourceChanged(
+        id=stxobj.id,
+        resourcePoolId=target.resourcePoolId,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
index e65440a..5f6c341 100644 (file)
@@ -17,11 +17,12 @@ from __future__ import annotations
 import uuid
 # import json
 
-from o2ims.domain import commands
+from o2ims.domain import commands, events
 from o2ims.domain.stx_object import StxGenericModel
 from o2common.service.unit_of_work import AbstractUnitOfWork
 from o2ims.domain.resource_type import MismatchedModel
 from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain.subscription_obj import NotificationEventEnum
 
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
@@ -114,4 +115,9 @@ def update_by(target: Resource, stxobj: StxGenericModel,
     target.updatetime = stxobj.updatetime
     target.hash = stxobj.hash
     target.version_number = target.version_number + 1
-    target.events = []
+    target.events.append(events.ResourceChanged(
+        id=stxobj.id,
+        resourcePoolId=target.resourcePoolId,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
index 63b0534..f5df381 100644 (file)
 from __future__ import annotations
 import uuid
 # import json
+from typing import Callable
 
-from o2ims.domain import commands
+from o2ims.domain import commands, events
 from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
 from o2common.service.unit_of_work import AbstractUnitOfWork
 from o2ims.domain.resource_type import MismatchedModel
 from o2ims.domain.ocloud import Resource, ResourceType
@@ -33,7 +35,8 @@ class InvalidResourceType(Exception):
 
 def update_pserver(
     cmd: commands.UpdatePserver,
-    uow: AbstractUnitOfWork
+    uow: AbstractUnitOfWork,
+    publish: Callable
 ):
     stxobj = cmd.data
     with uow:
@@ -115,4 +118,9 @@ def update_by(target: Resource, stxobj: StxGenericModel,
     target.updatetime = stxobj.updatetime
     target.hash = stxobj.hash
     target.version_number = target.version_number + 1
-    target.events = []
+    target.events.append(events.ResourceChanged(
+        id=stxobj.id,
+        resourcePoolId=target.resourcePoolId,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
index 7ee2df2..cd4e680 100644 (file)
@@ -17,11 +17,12 @@ from __future__ import annotations
 import uuid
 # import json
 
-from o2ims.domain import commands
+from o2ims.domain import commands, events
 from o2ims.domain.stx_object import StxGenericModel
 from o2common.service.unit_of_work import AbstractUnitOfWork
 from o2ims.domain.resource_type import MismatchedModel
 from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain.subscription_obj import NotificationEventEnum
 
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
@@ -114,4 +115,9 @@ def update_by(target: Resource, stxobj: StxGenericModel,
     target.updatetime = stxobj.updatetime
     target.hash = stxobj.hash
     target.version_number = target.version_number + 1
-    target.events = []
+    target.events.append(events.ResourceChanged(
+        id=stxobj.id,
+        resourcePoolId=target.resourcePoolId,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
index a07072b..683475e 100644 (file)
@@ -17,8 +17,9 @@ from __future__ import annotations
 import uuid
 # import json
 
-from o2ims.domain import commands
+from o2ims.domain import commands, events
 from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
 from o2common.service.unit_of_work import AbstractUnitOfWork
 from o2ims.domain.resource_type import MismatchedModel
 from o2ims.domain.ocloud import Resource, ResourceType
@@ -114,4 +115,9 @@ def update_by(target: Resource, stxobj: StxGenericModel,
     target.updatetime = stxobj.updatetime
     target.hash = stxobj.hash
     target.version_number = target.version_number + 1
-    target.events = []
+    target.events.append(events.ResourceChanged(
+        id=stxobj.id,
+        resourcePoolId=target.resourcePoolId,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
index 41bed8e..bfb2e1d 100644 (file)
@@ -17,11 +17,12 @@ from __future__ import annotations
 import uuid
 # import json
 
-from o2ims.domain import commands
+from o2ims.domain import commands, events
 from o2ims.domain.stx_object import StxGenericModel
 from o2common.service.unit_of_work import AbstractUnitOfWork
 from o2ims.domain.resource_type import MismatchedModel
 from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain.subscription_obj import NotificationEventEnum
 
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
@@ -114,4 +115,9 @@ def update_by(target: Resource, stxobj: StxGenericModel,
     target.updatetime = stxobj.updatetime
     target.hash = stxobj.hash
     target.version_number = target.version_number + 1
-    target.events = []
+    target.events.append(events.ResourceChanged(
+        id=stxobj.id,
+        resourcePoolId=target.resourcePoolId,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
index 88cf182..b5b0e0d 100644 (file)
@@ -20,11 +20,12 @@ from o2ims.domain.stx_object import StxGenericModel
 # from dataclasses import asdict
 # from typing import List, Dict, Callable, Type
 # TYPE_CHECKING
-from o2ims.domain import commands
+from o2ims.domain import commands, events
 from o2common.service.unit_of_work import AbstractUnitOfWork
 # from o2ims.domain.resource_type import InvalidOcloudState
 from o2ims.domain.resource_type import MismatchedModel
 from o2ims.domain.ocloud import ResourcePool
+from o2ims.domain.subscription_obj import NotificationEventEnum
 # if TYPE_CHECKING:
 #     from . import unit_of_work
 
@@ -98,4 +99,8 @@ def update_by(target: ResourcePool, stxobj: StxGenericModel,
     target.hash = stxobj.hash
     target.oCloudId = parentid
     target.version_number = target.version_number + 1
-    target.events = []
+    target.events.append(events.ResourcePoolChanged(
+        id=stxobj.id,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
diff --git a/o2ims/service/event/__init__.py b/o2ims/service/event/__init__.py
new file mode 100644 (file)
index 0000000..b514342
--- /dev/null
@@ -0,0 +1,13 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
diff --git a/o2ims/service/event/notify_handler.py b/o2ims/service/event/notify_handler.py
new file mode 100644 (file)
index 0000000..3ece84a
--- /dev/null
@@ -0,0 +1,99 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+import json\r
+import redis\r
+# import requests\r
+import http.client\r
+from urllib.parse import urlparse\r
+\r
+from o2common.config import config\r
+from o2common.service.unit_of_work import AbstractUnitOfWork\r
+from o2ims.domain import commands\r
+from o2ims.domain.subscription_obj import Subscription, Message2SMO\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+# Maybe another MQ server\r
+r = redis.Redis(**config.get_redis_host_and_port())\r
+\r
+\r
+def notify_change_to_smo(\r
+    cmd: commands.PubMessage2SMO,\r
+    uow: AbstractUnitOfWork,\r
+):\r
+    logger.info('In notify_change_to_smo')\r
+    data = cmd.data\r
+    with uow:\r
+        subs = uow.subscriptions.list()\r
+        for sub in subs:\r
+            sub_data = sub.serialize()\r
+            logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))\r
+\r
+            try:\r
+                resource_filter = json.loads(sub_data['filter'])\r
+                if len(resource_filter) > 0:\r
+                    resource = uow.resources.get(data.id)\r
+                    logger.debug(type(resource))\r
+                    if resource:  # TODO deal with resource is empty\r
+                        res_type_id = resource.serialize()['resourceTypeId']\r
+                        resourcetype = uow.resource_types.get(res_type_id)\r
+                        logger.debug(resourcetype.name)\r
+                        if resourcetype.name not in resource_filter:\r
+                            continue\r
+            except json.decoder.JSONDecodeError as err:\r
+                logger.warning(\r
+                    'subscription filter decode json failed: {}'.format(err))\r
+\r
+            callback_smo(sub, data)\r
+\r
+\r
+def callback_smo(sub: Subscription, msg: Message2SMO):\r
+    sub_data = sub.serialize()\r
+    callback_data = json.dumps({\r
+        'consumerSubscriptionId': sub_data['consumerSubscriptionId'],\r
+        'notificationEventType': msg.notificationEventType,\r
+        'objectRef': msg.objectRef,\r
+        'updateTime': msg.updatetime\r
+    })\r
+    logger.info('URL: {}, data: {}'.format(\r
+        sub_data['callback'], callback_data))\r
+    # r.publish(sub_data['subscriptionId'], json.dumps({\r
+    #     'consumerSubscriptionId': sub_data['consumerSubscriptionId'],\r
+    #     'notificationEventType': msg.notificationEventType,\r
+    #     'objectRef': msg.objectRef\r
+    # }))\r
+    # try:\r
+    #     headers = {'User-Agent': 'Mozilla/5.0'}\r
+    #     resp = requests.post(sub_data['callback'], data=callback_data,\r
+    #                          headers=headers)\r
+    #     if resp.status_code == 202 or resp.status_code == 200:\r
+    #         logger.info('Notify to SMO successed')\r
+    #         return\r
+    #     logger.error('Response code is: {}'.format(resp.status_code))\r
+    # except requests.exceptions.HTTPError as err:\r
+    #     logger.error('request smo error: {}'.format(err))\r
+    o = urlparse(sub_data['callback'])\r
+    conn = http.client.HTTPConnection(o.netloc)\r
+    headers = {'Content-type': 'application/json'}\r
+    conn.request('POST', o.path, callback_data, headers)\r
+    resp = conn.getresponse()\r
+    data = resp.read().decode('utf-8')\r
+    # json_data = json.loads(data)\r
+    if resp.status == 202 or resp.status == 200:\r
+        logger.info('Notify to SMO successed, response code {} {}, data {}'.\r
+                    format(resp.status, resp.reason, data))\r
+        return\r
+    logger.error('Response code is: {}'.format(resp.status))\r
diff --git a/o2ims/service/event/ocloud_event.py b/o2ims/service/event/ocloud_event.py
new file mode 100644 (file)
index 0000000..dbeb448
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+from typing import Callable\r
+\r
+from o2ims.domain import events\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+\r
+def notify_ocloud_update(\r
+    event: events.OcloudChanged,\r
+    publish: Callable,\r
+):\r
+    logger.info('In notify_ocloud_update')\r
+    publish("OcloudChanged", event)\r
+    logger.debug("published Ocloud Changed: {}".format(\r
+        event.id))\r
diff --git a/o2ims/service/event/resource_event.py b/o2ims/service/event/resource_event.py
new file mode 100644 (file)
index 0000000..2a749b7
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+from typing import Callable\r
+\r
+from o2ims.domain import events\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+\r
+def notify_resource_change(\r
+    event: events.ResourceChanged,\r
+    publish: Callable,\r
+):\r
+    logger.info('In notify_resource_change')\r
+    publish("ResourceChanged", event)\r
+    logger.debug("published Resource Changed: {}".format(\r
+        event.id))\r
diff --git a/o2ims/service/event/resource_pool_event.py b/o2ims/service/event/resource_pool_event.py
new file mode 100644 (file)
index 0000000..9c78d5b
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+from typing import Callable\r
+\r
+from o2ims.domain import events\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+\r
+def notify_resourcepool_change(\r
+    event: events.ResourcePoolChanged,\r
+    publish: Callable,\r
+):\r
+    logger.info('In notify_resourcepool_change')\r
+    publish("ResourcePoolChanged", event)\r
+    logger.debug("published Resource Pool Changed: {}".format(\r
+        event.id))\r
index f970517..3735298 100644 (file)
@@ -16,7 +16,7 @@ import uuid
 \r
 from o2common.service import unit_of_work\r
 from o2ims.views.ocloud_dto import SubscriptionDTO\r
-from o2ims.domain.ocloud import Subscription\r
+from o2ims.domain.subscription_obj import Subscription\r
 \r
 \r
 def oclouds(uow: unit_of_work.AbstractUnitOfWork):\r
@@ -41,7 +41,6 @@ def resource_type_one(resourceTypeId: str,
                       uow: unit_of_work.AbstractUnitOfWork):\r
     with uow:\r
         first = uow.resource_types.get(resourceTypeId)\r
-        print(first)\r
         return first.serialize() if first is not None else None\r
 \r
 \r
index 3e9d882..9b6cbe9 100644 (file)
@@ -5,6 +5,7 @@ requests
 tox\r
 \r
 pytest\r
+pytest-cov\r
 pytest-icdiff\r
 mock\r
 \r
index 6a67447..df91d29 100644 (file)
@@ -17,7 +17,7 @@ import pytest
 
 from o2ims.domain import resource_type as rt
 from o2ims.adapter import ocloud_repository as repository
-from o2ims.domain import ocloud
+from o2ims.domain import ocloud, subscription_obj
 from o2common.config import config
 
 pytestmark = pytest.mark.usefixtures("mappers")
@@ -158,7 +158,7 @@ def test_add_subscription(sqlite_session_factory):
     session = sqlite_session_factory()
     repo = repository.SubscriptionSqlAlchemyRepository(session)
     subscription_id1 = str(uuid.uuid4())
-    subscription1 = ocloud.Subscription(
+    subscription1 = subscription_obj.Subscription(
         subscription_id1, "https://callback/uri/write/here")
     repo.add(subscription1)
     assert repo.get(subscription_id1) == subscription1
index d44bedf..510bf40 100644 (file)
@@ -17,7 +17,7 @@ import pytest
 
 from o2common.config import config
 from o2ims.views import ocloud_view
-from o2ims.domain import ocloud
+from o2ims.domain import ocloud, subscription_obj
 from o2ims.domain import resource_type as rt
 
 
@@ -200,7 +200,7 @@ def test_view_deployment_manager_one(sqlite_uow):
 def test_view_subscriptions(sqlite_uow):
 
     subscription_id1 = str(uuid.uuid4())
-    subscription1 = ocloud.Subscription(
+    subscription1 = subscription_obj.Subscription(
         subscription_id1, "https://callback/uri/write/here")
     with sqlite_uow as uow:
         uow.subscriptions.add(subscription1)
@@ -214,7 +214,7 @@ def test_view_subscriptions(sqlite_uow):
 def test_view_subscription_one(sqlite_uow):
 
     subscription_id1 = str(uuid.uuid4())
-    subscription1 = ocloud.Subscription(
+    subscription1 = subscription_obj.Subscription(
         subscription_id1, "https://callback/uri/write/here")
 
     # Query return None
@@ -235,7 +235,7 @@ def test_view_subscription_one(sqlite_uow):
 def test_view_subscription_delete(sqlite_uow):
 
     subscription_id1 = str(uuid.uuid4())
-    subscription1 = ocloud.Subscription(
+    subscription1 = subscription_obj.Subscription(
         subscription_id1, "https://callback/uri/write/here")
 
     with sqlite_uow as uow:
diff --git a/tests/mock_smo/subscription.py b/tests/mock_smo/subscription.py
new file mode 100644 (file)
index 0000000..d2ab3c7
--- /dev/null
@@ -0,0 +1,110 @@
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import redis
+import json
+import signal
+import http.client
+# from o2common.config import config
+
+# from o2common.helper import o2logging
+# logger = o2logging.get_logger(__name__)
+
+# r = redis.Redis(**config.get_redis_host_and_port())
+r = redis.Redis(host='127.0.0.1', port=63791)
+
+# apibase = config.get_o2ims_api_base()
+# url = config.get_api_url()
+apibase = '/o2ims_infrastructureInventory/v1'
+url = '127.0.0.1:5005'
+
+
+class Singleton(type):
+    _instances = {}
+
+    def __call__(cls, *args, **kwargs):
+        if cls not in cls._instances:
+            cls._instances[cls] = super(
+                Singleton, cls).__call__(*args, **kwargs)
+        return cls._instances[cls]
+
+
+class Subscription(metaclass=Singleton):
+
+    def __init__(self, sub_id='') -> None:
+        self.url = url
+        self.subId = sub_id
+
+    def subscription_ims(self):
+        conn = http.client.HTTPConnection(self.url)
+        headers = {'Content-type': 'application/json'}
+        post_val = {
+            'callback': self.url,
+            'consumerSubscriptionId': 'mock_smo',
+            'filter': '["pserver","pserver_ram"]'
+        }
+        json_val = json.dumps(post_val)
+        conn.request('POST', apibase+'/subscriptions', json_val, headers)
+        resp = conn.getresponse()
+        data = resp.read().decode('utf-8')
+        print(resp.status, resp.reason)
+        print(data)
+        json_data = json.loads(data)
+        self.subId = json_data['subscriptionId']
+
+    def subscription_mq(self):
+        sub = r.pubsub(ignore_subscribe_messages=True)
+        sub.subscribe(self.subId)
+
+        for m in sub.listen():
+            try:
+                # logger.info("handling %s", m)
+                print("handling %s", m)
+                channel = m['channel'].decode("UTF-8")
+                if channel == self.subId:
+                    datastr = m['data']
+                    data = json.loads(datastr)
+                    # logger.info('notification: {}'.format(data))
+                    print('notification: {}'.format(data))
+                else:
+                    # logger.info("unhandled:{}".format(channel))
+                    print("unhandled:{}".format(channel))
+            except Exception as ex:
+                # logger.warning("{}".format(str(ex)))
+                print("[WARNING]{}".format(str(ex)))
+                continue
+
+    def unsubscription_ims(self):
+        conn = http.client.HTTPConnection(self.url)
+        conn.request('DELETE', apibase + '/subscriptions/' + self.subId)
+        resp = conn.getresponse()
+        print(resp.status, resp.reason)
+
+
+def handler(signum, frame):
+    print('\nCtrl-c was pressed. Call to delete subscription')
+    sub = Subscription()
+    sub.unsubscription_ims()
+    exit()
+
+
+def main():
+    sub = Subscription()
+    sub.subscription_ims()
+    signal.signal(signal.SIGINT, handler)
+    sub.subscription_mq()
+
+
+if __name__ == "__main__":
+    main()
index c92cb22..13fc436 100644 (file)
@@ -15,7 +15,7 @@
 import uuid
 from unittest.mock import MagicMock
 
-from o2ims.domain import ocloud
+from o2ims.domain import ocloud, subscription_obj
 from o2ims.domain import resource_type as rt
 from o2ims.views import ocloud_view
 from o2common.config import config
@@ -88,7 +88,7 @@ def test_new_deployment_manager():
 
 def test_new_subscription():
     subscription_id1 = str(uuid.uuid4())
-    subscription1 = ocloud.Subscription(
+    subscription1 = subscription_obj.Subscription(
         subscription_id1, "https://callback/uri/write/here")
     assert subscription_id1 is not None and\
         subscription1.subscriptionId == subscription_id1
index c4b134d..806d20f 100644 (file)
@@ -15,7 +15,7 @@
 import time\r
 from datetime import datetime\r
 import json\r
-from typing import List\r
+from typing import Callable, List\r
 # from o2common.config import config\r
 import uuid\r
 from o2common.service.watcher.base import BaseWatcher, WatcherTree\r
@@ -129,7 +129,8 @@ class FakeUnitOfWork(AbstractUnitOfWork):
 def create_fake_bus(uow):\r
     def update_ocloud(\r
             cmd: commands.UpdateOCloud,\r
-            uow: AbstractUnitOfWork):\r
+            uow: AbstractUnitOfWork,\r
+            publish: Callable):\r
         return\r
 \r
     fakeuow = FakeUnitOfWork()\r