1. Trigger an event while resource changed, create a command handler to deal the event, let it can callback to SMO.
2. Create a mock SMO server with a simple html page to subscribe to O2IMS resource changing.
3. Fix a bug that when watch the pserver that it has an unavailable node.
Issue-ID: INF-238
Signed-off-by: Zhang Rong(Jon) <rong.zhang@windriver.com>
Change-Id: I13304656a721dbe5d4aec23200063e874eefa521
RUN tar -zxvf helm-v3.3.1-linux-amd64.tar.gz; cp linux-amd64/helm /usr/local/bin\r
\r
RUN mkdir -p /etc/kubeconfig/\r
-COPY temp/kubeconfig/config /etc/kubeconfig/\r
+# COPY temp/kubeconfig/config /etc/kubeconfig/\r
\r
WORKDIR /src\r
- /bin/sh
- /tests/o2app-watcher-entry.sh
+ mock_smo:
+ build:
+ context: ./mock_smo
+ dockerfile: Dockerfile
+ image: mock-smo
+ depends_on:
+ - mock_smo_redis
+ environment:
+ - API_HOST=api
+ - REDIS_HOST=mock_smo_redis
+ - MOCK_SMO_HOST=mock_smo
+ - PYTHONDONTWRITEBYTECODE=1
+ - FLASK_APP=/mock_smo/entrypoints/mock_smo.py
+ - FLASK_DEBUG=1
+ - PYTHONUNBUFFERED=1
+ - LOGGING_CONFIG_LEVEL=DEBUG
+ volumes:
+ - ./mock_smo/etc:/tmp/etc
+ - ./mock_smo/mock_smo:/mock_smo
+ entrypoint:
+ - /bin/sh
+ - /src/o2app-mock-smo.sh
+ ports:
+ - "5001:80"
+
postgres:
image: postgres:9.6
environment:
image: redis:alpine
ports:
- "63791:6379"
+
+ mock_smo_redis:
+ image: redis:alpine
+ ports:
+ - "63792:6379"
--- /dev/null
+FROM python:3.10-slim-buster\r
+\r
+RUN apt-get update; apt-get install -y git gcc\r
+\r
+COPY requirements.txt /tmp/\r
+RUN pip install -r /tmp/requirements.txt \r
+\r
+# COPY requirements-test.txt /tmp/\r
+# RUN pip install -r /tmp/requirements-test.txt\r
+\r
+RUN mkdir -p /src\r
+COPY mock_smo/ /src/mock_smo/\r
+\r
+COPY setup.py o2app-mock-smo.sh /src/\r
+RUN pip install -e /src\r
+\r
+COPY etc/ /etc/mock_smo/\r
+\r
+# COPY tests/ /tests/\r
+\r
+# RUN apt-get install -y procps vim\r
+\r
+WORKDIR /src\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+version: 1\r
+disable_existing_loggers: False\r
+\r
+loggers:\r
+ root:\r
+ handlers: [console_handler, file_handler]\r
+ level: "WARNING"\r
+ propagate: False\r
+ o2common:\r
+ handlers: [console_handler, file_handler]\r
+ level: "WARNING"\r
+ propagate: False\r
+ o2ims:\r
+ handlers: [console_handler, file_handler]\r
+ level: "DEBUG"\r
+ propagate: False\r
+ o2dms:\r
+ handlers: [console_handler, file_handler]\r
+ level: "DEBUG"\r
+ propagate: False\r
+handlers:\r
+ console_handler:\r
+ level: "DEBUG"\r
+ class: "logging.StreamHandler"\r
+ formatter: "standard"\r
+ file_handler:\r
+ level: "DEBUG"\r
+ class: "logging.handlers.RotatingFileHandler"\r
+ filename: "/var/log/mock_smo.log"\r
+ formatter: "standard"\r
+ maxBytes: 52428800\r
+ backupCount: 10\r
+formatters:\r
+ standard:\r
+ format: "%(asctime)s:[%(name)s]:[%(filename)s]-[%(lineno)d] [%(levelname)s]:%(message)s"\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+import mock_smo.logging as logging
+logger = logging.get_logger(__name__)
+
+
+def get_mock_smo_api_url():
+ host = os.environ.get("API_HOST", "localhost")
+ port = 5001 if host == "localhost" else 80
+ return f"http://{host}:{port}"
+
+
+def get_root_api_base():
+ return "/"
+
+
+def get_o2ims_api_base():
+ return get_root_api_base() + 'o2ims_infrastructureInventory/v1'
+
+
+def get_o2dms_api_base():
+ return get_root_api_base() + "o2dms"
+
+
+def get_redis_host_and_port():
+ host = os.environ.get("REDIS_HOST", "localhost")
+ port = 63792 if host == "localhost" else 6379
+ return dict(host=host, port=port)
+
+
+def get_smo_o2endpoint():
+ smo_o2endpoint = os.environ.get(
+ "SMO_O2_ENDPOINT", "http://localhost/smo_sim")
+ return smo_o2endpoint
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+import json\r
+import redis\r
+import http.client\r
+from flask import Flask, request\r
+from flask.helpers import url_for\r
+\r
+import mock_smo.config as config\r
+import mock_smo.logging as logging\r
+logger = logging.get_logger(__name__)\r
+\r
+apibase = config.get_o2ims_api_base()\r
+app = Flask(__name__)\r
+\r
+r = redis.Redis(**config.get_redis_host_and_port())\r
+REDIS_SUB_KEY = 'mock_smo_sub_key'\r
+REDIS_O2IMS_URL = 'mock_smo_o2ims_url'\r
+\r
+\r
+@app.route('/', methods=['GET', 'POST'])\r
+def index():\r
+ if request.method == 'POST':\r
+ url = request.form['url']\r
+ consumerSubscriptionId = request.form['consumerSubId']\r
+ sub_id = subscription_ims(url, consumerSubscriptionId)\r
+ return """\r
+<h1>Subscribed O2IMS</h1>\r
+<h3>Subscription ID: %s</h3>\r
+<h3>Subscribed O2IMS URL: %s</h3>\r
+<a href="%s">\r
+ <input type="button" value="Unsubscription" />\r
+</a>\r
+""" % (sub_id, url, url_for('unsubscription'))\r
+ return """\r
+<h1>Subscribe O2IMS</h1>\r
+<form method="POST">\r
+ <label for="url">O2 IMS URL: </label>\r
+ <input type="text" id="url" name="url" value="api:80"></br></br>\r
+ <label for="consumerSubId">Consumer Sub ID: </label>\r
+ <input type="text" id="consumerSubId" name="consumerSubId"></br></br>\r
+ <input type="submit" value="Submit">\r
+</form>\r
+"""\r
+\r
+\r
+@app.route('/unsubscription')\r
+def unsubscription():\r
+ sub_key = r.get(REDIS_SUB_KEY)\r
+ logger.info('Subscription key is {}'.format(sub_key))\r
+ if sub_key is None:\r
+ return '<h1>Already unsubscribed</h1>'\r
+ url = r.get(REDIS_O2IMS_URL).decode('utf-8')\r
+ logger.info('O2 IMS API is: {}'.format(url))\r
+ unsubscription_ims(url, sub_key.decode('utf-8'))\r
+ r.delete(REDIS_O2IMS_URL)\r
+ r.delete(REDIS_SUB_KEY)\r
+ return """\r
+<h1>Unsubscribed O2IMS</h1>\r
+<a href="/">\r
+ <input type="button" value="Go Back" />\r
+</a>\r
+"""\r
+\r
+\r
+@app.route('/callback', methods=['POST'])\r
+def callback():\r
+ logger.info('Callback data: {}'.format(request.get_data()))\r
+ return '', 202\r
+\r
+\r
+def subscription_ims(url, consumerSubscriptionId):\r
+ sub_key = r.get(REDIS_SUB_KEY)\r
+ logger.info('Subscription key is {}'.format(sub_key))\r
+ if sub_key is not None:\r
+ return sub_key.decode('utf-8')\r
+\r
+ logger.info(request.host_url)\r
+ conn = http.client.HTTPConnection(url)\r
+ headers = {'Content-type': 'application/json'}\r
+ post_val = {\r
+ 'callback': 'http://mock_smo:80' + url_for('callback'),\r
+ 'consumerSubscriptionId': consumerSubscriptionId,\r
+ 'filter': '["pserver"]' # '["pserver","pserver_mem"]'\r
+ }\r
+ json_val = json.dumps(post_val)\r
+ conn.request('POST', apibase+'/subscriptions', json_val, headers)\r
+ resp = conn.getresponse()\r
+ data = resp.read().decode('utf-8')\r
+ logger.info('Subscription response: {} {}, data: {}'.format(\r
+ resp.status, resp.reason, data))\r
+ json_data = json.loads(data)\r
+\r
+ r.set(REDIS_SUB_KEY, json_data['subscriptionId'])\r
+ r.set(REDIS_O2IMS_URL, url)\r
+ return json_data['subscriptionId']\r
+\r
+\r
+def unsubscription_ims(url, subId):\r
+ conn = http.client.HTTPConnection(url)\r
+ conn.request('DELETE', apibase + '/subscriptions/' + subId)\r
+ resp = conn.getresponse()\r
+ logger.info('Unsubscription response: {} {}'.format(\r
+ resp.status, resp.reason))\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+import logging\r
+import logging.config\r
+import logging.handlers\r
+import os\r
+import yaml\r
+\r
+\r
+def get_logger(name=None):\r
+ CONFIG_FILE = os.environ.get(\r
+ "LOGGING_CONFIG_FILE", "/etc/mock_smo/log.yaml")\r
+ if os.path.exists(CONFIG_FILE):\r
+ with open(file=CONFIG_FILE, mode='r', encoding="utf-8") as file:\r
+ config_yaml = yaml.load(stream=file, Loader=yaml.FullLoader)\r
+ logging.config.dictConfig(config=config_yaml)\r
+\r
+ logger = logging.getLogger(name)\r
+\r
+ # override logging level\r
+ LOGGING_CONFIG_LEVEL = os.environ.get("LOGGING_CONFIG_LEVEL", None)\r
+ if LOGGING_CONFIG_LEVEL:\r
+ logger.setLevel(LOGGING_CONFIG_LEVEL)\r
+ return logger\r
--- /dev/null
+#!/bin/sh
+
+mkdir -p /etc/mock_smo
+cp -r /tmp/etc/* /etc/mock_smo/
+mkdir -p /src/mock_smo
+cp -r /mock_smo/* /src/mock_smo
+
+# cp -r requirements.txt /src/requirements.txt
+
+pip install -e /src
+
+export FLASK_APP=/mock_smo/entrypoints/mock_smo.py
+flask run --host=0.0.0.0 --port=80
--- /dev/null
+flask\r
+flask-restx\r
+redis\r
+PyYAML>=5.4.1\r
+\r
--- /dev/null
+from setuptools import setup\r
+from setuptools import find_packages\r
+\r
+setup(\r
+ name="mock_smo",\r
+ version="1.0",\r
+ packages=find_packages(),\r
+ license="LICENSE",\r
+ description="Mock SMO server for O2 IMS and DMS",\r
+ install_requires=[\r
+ 'httplib2',\r
+ ]\r
+)\r
\r
def _collect_new_events(self):\r
for entry in self.oclouds.seen:\r
- while hasattr(entry, 'events') and len(entry.events) > 0:\r
+ # while hasattr(entry, 'events') and len(entry.events) > 0:\r
+ while entry.events is not None and len(entry.events) > 0:\r
yield entry.events.pop(0)\r
for entry in self.resource_pools.seen:\r
- while hasattr(entry, 'events') and len(entry.events) > 0:\r
+ while entry.events is not None and len(entry.events) > 0:\r
yield entry.events.pop(0)\r
for entry in self.resources.seen:\r
- while hasattr(entry, 'events') and len(entry.events) > 0:\r
+ while entry.events is not None and len(entry.events) > 0:\r
yield entry.events.pop(0)\r
for entry in self.resource_types.seen:\r
while hasattr(entry, 'events') and len(entry.events) > 0:\r
# limitations under the License.
# import json
-from logging import log
import redis
import json
from o2app import bootstrap
from o2common.config import config
# from o2common.domain import commands
from o2dms.domain import commands
-from o2dms.domain import events
+from o2ims.domain import commands as imscmd
from o2common.helper import o2logging
+from o2ims.domain.subscription_obj import Message2SMO
logger = o2logging.get_logger(__name__)
r = redis.Redis(**config.get_redis_host_and_port())
+apibase = config.get_o2ims_api_base()
+
def main():
logger.info("Redis pubsub starting")
bus = bootstrap.bootstrap()
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe("NfDeploymentStateChanged")
+ pubsub.subscribe('ResourceChanged')
for m in pubsub.listen():
try:
data = json.loads(datastr)
logger.info('HandleNfDeploymentStateChanged with cmd:{}'.format(data))
cmd = commands.HandleNfDeploymentStateChanged(
- NfDeploymentId = data['NfDeploymentId'],
- FromState = data['FromState'],
- ToState = data['ToState']
+ NfDeploymentId=data['NfDeploymentId'],
+ FromState=data['FromState'],
+ ToState=data['ToState']
)
bus.handle(cmd)
+ if channel == 'ResourceChanged':
+ datastr = m['data']
+ data = json.loads(datastr)
+ logger.info('ResourceChanged with cmd:{}'.format(data))
+ ref = apibase + '/resourcePools/' + data['resourcePoolId'] +\
+ '/resources/' + data['id']
+ cmd = imscmd.PubMessage2SMO(data=Message2SMO(
+ id=data['id'], ref=ref,
+ eventtype=data['notificationEventType'],
+ updatetime=data['updatetime']))
+ bus.handle(cmd)
else:
logger.info("unhandled:{}".format(channel))
super().__init__(worker_id)\r
self.args = args\r
self.bus = bootstrap.bootstrap()\r
- self.worker = PollWorker()\r
+ self.worker = PollWorker(bus=self.bus)\r
\r
def run(self):\r
try:\r
resourcepool_handler, pserver_handler, pserver_cpu_handler, \
pserver_mem_handler, pserver_port_handler, pserver_if_handler,\
pserver_eth_handler
+from o2ims.service.event import notify_handler, ocloud_event, \
+ resource_event, resource_pool_event
# if TYPE_CHECKING:
# from . import unit_of_work
EVENT_HANDLERS = {
o2dms_events.NfDeploymentStateChanged: [
nfdeployment_handler.publish_nfdeployment_state_change
- ]
+ ],
# o2dms_events.NfDeploymentCreated: [
# nfdeployment_handler.publish_nfdeployment_created],
# o2dms_events.NfDeploymentInstalled: [
# nfdeployment_handler.publish_nfdeployment_uninstalling],
# o2dms_events.NfDeploymentUninstalled: [
# nfdeployment_handler.publish_nfdeployment_uninstalled]
-} # type: Dict[Type[events.Event], Callable]
+ events.OcloudChanged: [ocloud_event.notify_ocloud_update],
+ events.ResourceChanged: [resource_event.notify_resource_change],
+ events.ResourcePoolChanged: [resource_pool_event.\
+ notify_resourcepool_change],
+} # type: Dict[Type[events.Event], Callable]
COMMAND_HANDLERS = {
commands.UpdatePserverIf: pserver_if_handler.update_pserver_if,
commands.UpdatePserverIfPort: pserver_port_handler.update_pserver_port,
commands.UpdatePserverEth: pserver_eth_handler.update_pserver_eth,
-
- o2dms_cmmands.HandleNfDeploymentStateChanged: \
- nfdeployment_handler.handle_nfdeployment_statechanged,
- o2dms_cmmands.InstallNfDeployment: \
- nfdeployment_handler.install_nfdeployment,
- o2dms_cmmands.UninstallNfDeployment: \
- nfdeployment_handler.uninstall_nfdeployment,
- o2dms_cmmands.DeleteNfDeployment: \
- nfdeployment_handler.delete_nfdeployment,
+ o2dms_cmmands.HandleNfDeploymentStateChanged:
+ nfdeployment_handler.handle_nfdeployment_statechanged,
+ o2dms_cmmands.InstallNfDeployment:
+ nfdeployment_handler.install_nfdeployment,
+ o2dms_cmmands.UninstallNfDeployment:
+ nfdeployment_handler.uninstall_nfdeployment,
+ o2dms_cmmands.DeleteNfDeployment:
+ nfdeployment_handler.delete_nfdeployment,
+ commands.PubMessage2SMO: notify_handler.notify_change_to_smo,
} # type: Dict[Type[commands.Command], Callable]
def publish(channel, event: events.Event):
logger.info("publishing: channel=%s, event=%s", channel, event)
- r.publish(channel, json.dumps(asdict(event)))
+ r.publish(channel, json.dumps(asdict(event), default=str))
from datetime import datetime\r
from typing import List\r
from sqlalchemy.inspection import inspect\r
+from sqlalchemy.exc import NoInspectionAvailable\r
from .events import Event\r
\r
\r
class AgRoot:\r
\r
+ events = []\r
+\r
def __init__(self) -> None:\r
self.hash = ""\r
self.updatetime = datetime.now()\r
self.events = [] # type: List[Event]\r
# self.id = ""\r
\r
+ # def append_event(self, event: Event):\r
+ # self.events = self.events.append(event)\r
+\r
\r
class Serializer(object):\r
\r
def serialize(self):\r
- # d = {c: getattr(self, c) for c in inspect(self).attrs.keys()}\r
- # if 'createtime' in d:\r
- # d['createtime'] = d['createtime'].isoformat()\r
- # if 'updatetime' in d:\r
- # d['updatetime'] = d['updatetime'].isoformat()\r
- # return d\r
- return {c: getattr(self, c) for c in inspect(self).attrs.keys()}\r
+ try:\r
+ # d = {c: getattr(self, c) for c in inspect(self).attrs.keys()}\r
+ # if 'createtime' in d:\r
+ # d['createtime'] = d['createtime'].isoformat()\r
+ # if 'updatetime' in d:\r
+ # d['updatetime'] = d['updatetime'].isoformat()\r
+ # return d\r
+ return {c: getattr(self, c) for c in inspect(self).attrs.keys()}\r
+ except NoInspectionAvailable:\r
+ return self.__dict__\r
\r
@staticmethod\r
def serialize_list(li):\r
\r
import time\r
import sched\r
+# from o2common.service.unit_of_work import AbstractUnitOfWork\r
from o2common.service.watcher.base import WatcherTree\r
\r
from o2common.helper import o2logging\r
\r
\r
class PollWorker(object):\r
- def __init__(self, interval=10) -> None:\r
+ def __init__(self, interval=10, bus=None) -> None:\r
super().__init__()\r
self.watchers = []\r
self.schedinstance = sched.scheduler(time.time, time.sleep)\r
self.schedinterval = interval\r
self._stopped = True\r
+ self._bus = bus\r
\r
def set_interval(self, interval):\r
if interval > 0:\r
except Exception as ex:\r
logger.warning("Worker raises exception:" + str(ex))\r
continue\r
+\r
+ # handle events\r
+ if self._bus is not None:\r
+ events = self._bus.uow.collect_new_events()\r
+ for event in events:\r
+ self._bus.handle(event)\r
+\r
self.schedinstance.enter(self.schedinterval, 1, self._repeat)\r
\r
# note the sched run will block current thread\r
logger.debug('host 1:' + str(hosts[0].to_dict()))\r
return [ocloudModel.StxGenericModel(\r
ResourceTypeEnum.PSERVER, self._hostconverter(host))\r
- for host in hosts if host]\r
+ for host in hosts if host and host.availability == 'available']\r
\r
def getPserver(self, id) -> ocloudModel.StxGenericModel:\r
host = self.stxclient.ihost.get(id)\r
from typing import List
-from o2ims.domain import ocloud
+from o2ims.domain import ocloud, subscription_obj
from o2ims.domain.ocloud_repo import OcloudRepository, ResourceTypeRepository,\
- ResourcePoolRepository, ResourceRepository, DeploymentManagerRepository,\
- SubscriptionRepository
+ ResourcePoolRepository, ResourceRepository, DeploymentManagerRepository
+from o2ims.domain.subscription_repo import SubscriptionRepository
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
super().__init__()
self.session = session
- def _add(self, subscription: ocloud.Subscription):
+ def _add(self, subscription: subscription_obj.Subscription):
self.session.add(subscription)
- def _get(self, subscription_id) -> ocloud.Subscription:
- return self.session.query(ocloud.Subscription).filter_by(
+ def _get(self, subscription_id) -> subscription_obj.Subscription:
+ return self.session.query(subscription_obj.Subscription).filter_by(
subscriptionId=subscription_id).first()
- def _list(self) -> List[ocloud.Subscription]:
- return self.session.query(ocloud.Subscription)
+ def _list(self) -> List[subscription_obj.Subscription]:
+ return self.session.query(subscription_obj.Subscription)
- def _update(self, subscription: ocloud.Subscription):
+ def _update(self, subscription: subscription_obj.Subscription):
self.session.add(subscription)
def _delete(self, subscription_id):
- self.session.query(ocloud.Subscription).filter_by(
+ self.session.query(subscription_obj.Subscription).filter_by(
subscriptionId=subscription_id).delete()
# from sqlalchemy.sql.sqltypes import Integer\r
\r
from o2ims.domain import ocloud as ocloudModel\r
+from o2ims.domain import subscription_obj as subModel\r
from o2ims.domain.resource_type import ResourceTypeEnum\r
\r
from o2common.helper import o2logging\r
"resourcePools": relationship(resourcepool_mapper)\r
}\r
)\r
- mapper(ocloudModel.Subscription, subscription)\r
+ mapper(subModel.Subscription, subscription)\r
\r
if engine is not None:\r
metadata.create_all(engine)\r
# from datetime import datetime
# from o2ims.domain.resource_type import ResourceTypeEnum
from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import Message2SMO
from o2common.domain.commands import Command
data: StxGenericModel
+@dataclass
+class PubMessage2SMO(Command):
+ data: Message2SMO
+
+
@dataclass
class UpdateOCloud(UpdateStxObject):
pass
# pylint: disable=too-few-public-methods
from dataclasses import dataclass
+from datetime import datetime
from o2common.domain.events import Event
+from o2ims.domain.subscription_obj import NotificationEventEnum
@dataclass
-class OcloudUpdated(Event):
- oCloudId: str
+class OcloudChanged(Event):
+ id: str
+ notificationEventType: NotificationEventEnum
+ updatetime: datetime.now()
@dataclass
-class ResourceTypeUpdated(Event):
- oCloudId: str
+class ResourceTypeChanged(Event):
+ id: str
+ updatetime: datetime.now()
+
+
+@dataclass
+class ResourcePoolChanged(Event):
+ id: str
+ notificationEventType: NotificationEventEnum
+ updatetime: datetime.now()
+
+
+@dataclass
+class ResourceChanged(Event):
+ id: str
+ resourcePoolId: str
+ notificationEventType: NotificationEventEnum
+ updatetime: datetime.now()
# from uuid import UUID\r
\r
\r
-class Subscription(AgRoot, Serializer):\r
- def __init__(self, id: str, callback: str, consumersubid: str = '',\r
- filter: str = '') -> None:\r
- super().__init__()\r
- self.subscriptionId = id\r
- self.version_number = 0\r
- self.callback = callback\r
- self.consumerSubscriptionId = consumersubid\r
- self.filter = filter\r
-\r
-\r
class DeploymentManager(AgRoot, Serializer):\r
def __init__(self, id: str, name: str, ocloudid: str,\r
dmsendpoint: str, description: str = '',\r
@abc.abstractmethod\r
def _update(self, deployment_manager: ocloud.DeploymentManager):\r
raise NotImplementedError\r
-\r
-\r
-class SubscriptionRepository(abc.ABC):\r
- def __init__(self):\r
- self.seen = set() # type: Set[ocloud.Subscription]\r
-\r
- def add(self, subscription: ocloud.Subscription):\r
- self._add(subscription)\r
- self.seen.add(subscription)\r
-\r
- def get(self, subscription_id) -> ocloud.Subscription:\r
- subscription = self._get(subscription_id)\r
- if subscription:\r
- self.seen.add(subscription)\r
- return subscription\r
-\r
- def list(self) -> List[ocloud.Subscription]:\r
- return self._list()\r
-\r
- def update(self, subscription: ocloud.Subscription):\r
- self._update(subscription)\r
-\r
- def delete(self, subscription_id):\r
- self._delete(subscription_id)\r
-\r
- @abc.abstractmethod\r
- def _add(self, subscription: ocloud.Subscription):\r
- raise NotImplementedError\r
-\r
- @abc.abstractmethod\r
- def _get(self, subscription_id) -> ocloud.Subscription:\r
- raise NotImplementedError\r
-\r
- @abc.abstractmethod\r
- def _update(self, subscription: ocloud.Subscription):\r
- raise NotImplementedError\r
-\r
- @abc.abstractmethod\r
- def _delete(self, subscription_id):\r
- raise NotImplementedError\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import annotations
+from enum import Enum
+from dataclasses import dataclass
+
+from o2common.domain.base import AgRoot, Serializer
+
+
+class Subscription(AgRoot, Serializer):
+ def __init__(self, id: str, callback: str, consumersubid: str = '',
+ filter: str = '') -> None:
+ super().__init__()
+ self.subscriptionId = id
+ self.version_number = 0
+ self.callback = callback
+ self.consumerSubscriptionId = consumersubid
+ self.filter = filter
+
+
+class NotificationEventEnum(str, Enum):
+ CREATE = 'CREATE'
+ MODIFY = 'MODIFY'
+ DELETE = 'DELETE'
+
+
+class Message2SMO(Serializer):
+ def __init__(self, eventtype: NotificationEventEnum,
+ id: str, ref: str, updatetime: str) -> None:
+ self.notificationEventType = eventtype
+ self.objectRef = ref
+ self.id = id
+ self.updatetime = updatetime
+
+
+@dataclass
+class EventState():
+ Initial = 0
+ NotInstalled = 1
+ Installing = 2
+ Installed = 3
+ Updating = 4
+ Uninstalling = 5
+ Abnormal = 6
+ Deleted = 7
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+from typing import List, Set
+from o2ims.domain import subscription_obj as subobj
+
+
+class SubscriptionRepository(abc.ABC):
+ def __init__(self):
+ self.seen = set() # type: Set[subobj.Subscription]
+
+ def add(self, subscription: subobj.Subscription):
+ self._add(subscription)
+ self.seen.add(subscription)
+
+ def get(self, subscription_id) -> subobj.Subscription:
+ subscription = self._get(subscription_id)
+ if subscription:
+ self.seen.add(subscription)
+ return subscription
+
+ def list(self) -> List[subobj.Subscription]:
+ return self._list()
+
+ def update(self, subscription: subobj.Subscription):
+ self._update(subscription)
+
+ def delete(self, subscription_id):
+ self._delete(subscription_id)
+
+ @abc.abstractmethod
+ def _add(self, subscription: subobj.Subscription):
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def _get(self, subscription_id) -> subobj.Subscription:
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def _update(self, subscription: subobj.Subscription):
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def _delete(self, subscription_id):
+ raise NotImplementedError
# pylint: disable=unused-argument
from __future__ import annotations
+from typing import Callable
-from o2ims.domain.stx_object import StxGenericModel
# from dataclasses import asdict
# from typing import List, Dict, Callable, Type
# TYPE_CHECKING
-from o2ims.domain import commands
+
+from o2common.config import config
+# from o2common.service.messagebus import MessageBus
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain.resource_type import InvalidOcloudState
-from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain import events, commands
from o2ims.domain.ocloud import Ocloud
-from o2common.config import config
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.resource_type import InvalidOcloudState, MismatchedModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
# if TYPE_CHECKING:
# from . import unit_of_work
def update_ocloud(
cmd: commands.UpdateOCloud,
- uow: AbstractUnitOfWork
+ uow: AbstractUnitOfWork,
+ publish: Callable
):
stxobj = cmd.data
with uow:
# ocloud.content = stxobj.content
ocloud.hash = stxobj.hash
ocloud.version_number = ocloud.version_number + 1
- ocloud.events = []
+ ocloud.events.append(events.OcloudChanged(
+ id=stxobj.id,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
import uuid
# import json
-from o2ims.domain import commands
+from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain.resource_type import MismatchedModel
from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain.subscription_obj import NotificationEventEnum
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
target.updatetime = stxobj.updatetime
target.hash = stxobj.hash
target.version_number = target.version_number + 1
- target.events = []
+ target.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=target.resourcePoolId,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
import uuid
# import json
-from o2ims.domain import commands
+from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain.resource_type import MismatchedModel
from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain.subscription_obj import NotificationEventEnum
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
target.updatetime = stxobj.updatetime
target.hash = stxobj.hash
target.version_number = target.version_number + 1
- target.events = []
+ target.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=target.resourcePoolId,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
from __future__ import annotations
import uuid
# import json
+from typing import Callable
-from o2ims.domain import commands
+from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain.resource_type import MismatchedModel
from o2ims.domain.ocloud import Resource, ResourceType
def update_pserver(
cmd: commands.UpdatePserver,
- uow: AbstractUnitOfWork
+ uow: AbstractUnitOfWork,
+ publish: Callable
):
stxobj = cmd.data
with uow:
target.updatetime = stxobj.updatetime
target.hash = stxobj.hash
target.version_number = target.version_number + 1
- target.events = []
+ target.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=target.resourcePoolId,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
import uuid
# import json
-from o2ims.domain import commands
+from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain.resource_type import MismatchedModel
from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain.subscription_obj import NotificationEventEnum
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
target.updatetime = stxobj.updatetime
target.hash = stxobj.hash
target.version_number = target.version_number + 1
- target.events = []
+ target.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=target.resourcePoolId,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
import uuid
# import json
-from o2ims.domain import commands
+from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain.resource_type import MismatchedModel
from o2ims.domain.ocloud import Resource, ResourceType
target.updatetime = stxobj.updatetime
target.hash = stxobj.hash
target.version_number = target.version_number + 1
- target.events = []
+ target.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=target.resourcePoolId,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
import uuid
# import json
-from o2ims.domain import commands
+from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain.resource_type import MismatchedModel
from o2ims.domain.ocloud import Resource, ResourceType
+from o2ims.domain.subscription_obj import NotificationEventEnum
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
target.updatetime = stxobj.updatetime
target.hash = stxobj.hash
target.version_number = target.version_number + 1
- target.events = []
+ target.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=target.resourcePoolId,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
# from dataclasses import asdict
# from typing import List, Dict, Callable, Type
# TYPE_CHECKING
-from o2ims.domain import commands
+from o2ims.domain import commands, events
from o2common.service.unit_of_work import AbstractUnitOfWork
# from o2ims.domain.resource_type import InvalidOcloudState
from o2ims.domain.resource_type import MismatchedModel
from o2ims.domain.ocloud import ResourcePool
+from o2ims.domain.subscription_obj import NotificationEventEnum
# if TYPE_CHECKING:
# from . import unit_of_work
target.hash = stxobj.hash
target.oCloudId = parentid
target.version_number = target.version_number + 1
- target.events = []
+ target.events.append(events.ResourcePoolChanged(
+ id=stxobj.id,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+import json\r
+import redis\r
+# import requests\r
+import http.client\r
+from urllib.parse import urlparse\r
+\r
+from o2common.config import config\r
+from o2common.service.unit_of_work import AbstractUnitOfWork\r
+from o2ims.domain import commands\r
+from o2ims.domain.subscription_obj import Subscription, Message2SMO\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+# Maybe another MQ server\r
+r = redis.Redis(**config.get_redis_host_and_port())\r
+\r
+\r
+def notify_change_to_smo(\r
+ cmd: commands.PubMessage2SMO,\r
+ uow: AbstractUnitOfWork,\r
+):\r
+ logger.info('In notify_change_to_smo')\r
+ data = cmd.data\r
+ with uow:\r
+ subs = uow.subscriptions.list()\r
+ for sub in subs:\r
+ sub_data = sub.serialize()\r
+ logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))\r
+\r
+ try:\r
+ resource_filter = json.loads(sub_data['filter'])\r
+ if len(resource_filter) > 0:\r
+ resource = uow.resources.get(data.id)\r
+ logger.debug(type(resource))\r
+ if resource: # TODO deal with resource is empty\r
+ res_type_id = resource.serialize()['resourceTypeId']\r
+ resourcetype = uow.resource_types.get(res_type_id)\r
+ logger.debug(resourcetype.name)\r
+ if resourcetype.name not in resource_filter:\r
+ continue\r
+ except json.decoder.JSONDecodeError as err:\r
+ logger.warning(\r
+ 'subscription filter decode json failed: {}'.format(err))\r
+\r
+ callback_smo(sub, data)\r
+\r
+\r
+def callback_smo(sub: Subscription, msg: Message2SMO):\r
+ sub_data = sub.serialize()\r
+ callback_data = json.dumps({\r
+ 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],\r
+ 'notificationEventType': msg.notificationEventType,\r
+ 'objectRef': msg.objectRef,\r
+ 'updateTime': msg.updatetime\r
+ })\r
+ logger.info('URL: {}, data: {}'.format(\r
+ sub_data['callback'], callback_data))\r
+ # r.publish(sub_data['subscriptionId'], json.dumps({\r
+ # 'consumerSubscriptionId': sub_data['consumerSubscriptionId'],\r
+ # 'notificationEventType': msg.notificationEventType,\r
+ # 'objectRef': msg.objectRef\r
+ # }))\r
+ # try:\r
+ # headers = {'User-Agent': 'Mozilla/5.0'}\r
+ # resp = requests.post(sub_data['callback'], data=callback_data,\r
+ # headers=headers)\r
+ # if resp.status_code == 202 or resp.status_code == 200:\r
+ # logger.info('Notify to SMO successed')\r
+ # return\r
+ # logger.error('Response code is: {}'.format(resp.status_code))\r
+ # except requests.exceptions.HTTPError as err:\r
+ # logger.error('request smo error: {}'.format(err))\r
+ o = urlparse(sub_data['callback'])\r
+ conn = http.client.HTTPConnection(o.netloc)\r
+ headers = {'Content-type': 'application/json'}\r
+ conn.request('POST', o.path, callback_data, headers)\r
+ resp = conn.getresponse()\r
+ data = resp.read().decode('utf-8')\r
+ # json_data = json.loads(data)\r
+ if resp.status == 202 or resp.status == 200:\r
+ logger.info('Notify to SMO successed, response code {} {}, data {}'.\r
+ format(resp.status, resp.reason, data))\r
+ return\r
+ logger.error('Response code is: {}'.format(resp.status))\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+from typing import Callable\r
+\r
+from o2ims.domain import events\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+\r
+def notify_ocloud_update(\r
+ event: events.OcloudChanged,\r
+ publish: Callable,\r
+):\r
+ logger.info('In notify_ocloud_update')\r
+ publish("OcloudChanged", event)\r
+ logger.debug("published Ocloud Changed: {}".format(\r
+ event.id))\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+from typing import Callable\r
+\r
+from o2ims.domain import events\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+\r
+def notify_resource_change(\r
+ event: events.ResourceChanged,\r
+ publish: Callable,\r
+):\r
+ logger.info('In notify_resource_change')\r
+ publish("ResourceChanged", event)\r
+ logger.debug("published Resource Changed: {}".format(\r
+ event.id))\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+from typing import Callable\r
+\r
+from o2ims.domain import events\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+\r
+def notify_resourcepool_change(\r
+ event: events.ResourcePoolChanged,\r
+ publish: Callable,\r
+):\r
+ logger.info('In notify_resourcepool_change')\r
+ publish("ResourcePoolChanged", event)\r
+ logger.debug("published Resource Pool Changed: {}".format(\r
+ event.id))\r
\r
from o2common.service import unit_of_work\r
from o2ims.views.ocloud_dto import SubscriptionDTO\r
-from o2ims.domain.ocloud import Subscription\r
+from o2ims.domain.subscription_obj import Subscription\r
\r
\r
def oclouds(uow: unit_of_work.AbstractUnitOfWork):\r
uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
first = uow.resource_types.get(resourceTypeId)\r
- print(first)\r
return first.serialize() if first is not None else None\r
\r
\r
tox\r
\r
pytest\r
+pytest-cov\r
pytest-icdiff\r
mock\r
\r
from o2ims.domain import resource_type as rt
from o2ims.adapter import ocloud_repository as repository
-from o2ims.domain import ocloud
+from o2ims.domain import ocloud, subscription_obj
from o2common.config import config
pytestmark = pytest.mark.usefixtures("mappers")
session = sqlite_session_factory()
repo = repository.SubscriptionSqlAlchemyRepository(session)
subscription_id1 = str(uuid.uuid4())
- subscription1 = ocloud.Subscription(
+ subscription1 = subscription_obj.Subscription(
subscription_id1, "https://callback/uri/write/here")
repo.add(subscription1)
assert repo.get(subscription_id1) == subscription1
from o2common.config import config
from o2ims.views import ocloud_view
-from o2ims.domain import ocloud
+from o2ims.domain import ocloud, subscription_obj
from o2ims.domain import resource_type as rt
def test_view_subscriptions(sqlite_uow):
subscription_id1 = str(uuid.uuid4())
- subscription1 = ocloud.Subscription(
+ subscription1 = subscription_obj.Subscription(
subscription_id1, "https://callback/uri/write/here")
with sqlite_uow as uow:
uow.subscriptions.add(subscription1)
def test_view_subscription_one(sqlite_uow):
subscription_id1 = str(uuid.uuid4())
- subscription1 = ocloud.Subscription(
+ subscription1 = subscription_obj.Subscription(
subscription_id1, "https://callback/uri/write/here")
# Query return None
def test_view_subscription_delete(sqlite_uow):
subscription_id1 = str(uuid.uuid4())
- subscription1 = ocloud.Subscription(
+ subscription1 = subscription_obj.Subscription(
subscription_id1, "https://callback/uri/write/here")
with sqlite_uow as uow:
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import redis
+import json
+import signal
+import http.client
+# from o2common.config import config
+
+# from o2common.helper import o2logging
+# logger = o2logging.get_logger(__name__)
+
+# r = redis.Redis(**config.get_redis_host_and_port())
+r = redis.Redis(host='127.0.0.1', port=63791)
+
+# apibase = config.get_o2ims_api_base()
+# url = config.get_api_url()
+apibase = '/o2ims_infrastructureInventory/v1'
+url = '127.0.0.1:5005'
+
+
+class Singleton(type):
+ _instances = {}
+
+ def __call__(cls, *args, **kwargs):
+ if cls not in cls._instances:
+ cls._instances[cls] = super(
+ Singleton, cls).__call__(*args, **kwargs)
+ return cls._instances[cls]
+
+
+class Subscription(metaclass=Singleton):
+
+ def __init__(self, sub_id='') -> None:
+ self.url = url
+ self.subId = sub_id
+
+ def subscription_ims(self):
+ conn = http.client.HTTPConnection(self.url)
+ headers = {'Content-type': 'application/json'}
+ post_val = {
+ 'callback': self.url,
+ 'consumerSubscriptionId': 'mock_smo',
+ 'filter': '["pserver","pserver_ram"]'
+ }
+ json_val = json.dumps(post_val)
+ conn.request('POST', apibase+'/subscriptions', json_val, headers)
+ resp = conn.getresponse()
+ data = resp.read().decode('utf-8')
+ print(resp.status, resp.reason)
+ print(data)
+ json_data = json.loads(data)
+ self.subId = json_data['subscriptionId']
+
+ def subscription_mq(self):
+ sub = r.pubsub(ignore_subscribe_messages=True)
+ sub.subscribe(self.subId)
+
+ for m in sub.listen():
+ try:
+ # logger.info("handling %s", m)
+ print("handling %s", m)
+ channel = m['channel'].decode("UTF-8")
+ if channel == self.subId:
+ datastr = m['data']
+ data = json.loads(datastr)
+ # logger.info('notification: {}'.format(data))
+ print('notification: {}'.format(data))
+ else:
+ # logger.info("unhandled:{}".format(channel))
+ print("unhandled:{}".format(channel))
+ except Exception as ex:
+ # logger.warning("{}".format(str(ex)))
+ print("[WARNING]{}".format(str(ex)))
+ continue
+
+ def unsubscription_ims(self):
+ conn = http.client.HTTPConnection(self.url)
+ conn.request('DELETE', apibase + '/subscriptions/' + self.subId)
+ resp = conn.getresponse()
+ print(resp.status, resp.reason)
+
+
+def handler(signum, frame):
+ print('\nCtrl-c was pressed. Call to delete subscription')
+ sub = Subscription()
+ sub.unsubscription_ims()
+ exit()
+
+
+def main():
+ sub = Subscription()
+ sub.subscription_ims()
+ signal.signal(signal.SIGINT, handler)
+ sub.subscription_mq()
+
+
+if __name__ == "__main__":
+ main()
import uuid
from unittest.mock import MagicMock
-from o2ims.domain import ocloud
+from o2ims.domain import ocloud, subscription_obj
from o2ims.domain import resource_type as rt
from o2ims.views import ocloud_view
from o2common.config import config
def test_new_subscription():
subscription_id1 = str(uuid.uuid4())
- subscription1 = ocloud.Subscription(
+ subscription1 = subscription_obj.Subscription(
subscription_id1, "https://callback/uri/write/here")
assert subscription_id1 is not None and\
subscription1.subscriptionId == subscription_id1
import time\r
from datetime import datetime\r
import json\r
-from typing import List\r
+from typing import Callable, List\r
# from o2common.config import config\r
import uuid\r
from o2common.service.watcher.base import BaseWatcher, WatcherTree\r
def create_fake_bus(uow):\r
def update_ocloud(\r
cmd: commands.UpdateOCloud,\r
- uow: AbstractUnitOfWork):\r
+ uow: AbstractUnitOfWork,\r
+ publish: Callable):\r
return\r
\r
fakeuow = FakeUnitOfWork()\r