From: Zhang Rong(Jon) Date: Thu, 16 Dec 2021 16:50:35 +0000 (+0800) Subject: Add subscription and notification for resource changes; fix a bug while pserver node... X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=5bf7465e213fa92b6514446e353e6f2da436371f;p=pti%2Fo2.git Add subscription and notification for resource changes; fix a bug while pserver node is uninstalled status 1. Trigger an event while resource changed, create a command handler to deal the event, let it can callback to SMO. 2. Create a mock SMO server with a simple html page to subscribe to O2IMS resource changing. 3. Fix a bug that when watch the pserver that it has an unavailable node. Issue-ID: INF-238 Signed-off-by: Zhang Rong(Jon) Change-Id: I13304656a721dbe5d4aec23200063e874eefa521 (cherry picked from commit 9625c5b766377f641d9641471f10dd491a61447f) --- diff --git a/Dockerfile.localtest b/Dockerfile.localtest index eee30ef..0324e33 100644 --- a/Dockerfile.localtest +++ b/Dockerfile.localtest @@ -52,6 +52,6 @@ RUN curl -O https://get.helm.sh/helm-v3.3.1-linux-amd64.tar.gz; RUN tar -zxvf helm-v3.3.1-linux-amd64.tar.gz; cp linux-amd64/helm /usr/local/bin RUN mkdir -p /etc/kubeconfig/ -COPY temp/kubeconfig/config /etc/kubeconfig/ +# COPY temp/kubeconfig/config /etc/kubeconfig/ WORKDIR /src diff --git a/docker-compose.yml b/docker-compose.yml index 477441a..563ab4b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -93,6 +93,31 @@ services: - /bin/sh - /tests/o2app-watcher-entry.sh + mock_smo: + build: + context: ./mock_smo + dockerfile: Dockerfile + image: mock-smo + depends_on: + - mock_smo_redis + environment: + - API_HOST=api + - REDIS_HOST=mock_smo_redis + - MOCK_SMO_HOST=mock_smo + - PYTHONDONTWRITEBYTECODE=1 + - FLASK_APP=/mock_smo/entrypoints/mock_smo.py + - FLASK_DEBUG=1 + - PYTHONUNBUFFERED=1 + - LOGGING_CONFIG_LEVEL=DEBUG + volumes: + - ./mock_smo/etc:/tmp/etc + - ./mock_smo/mock_smo:/mock_smo + entrypoint: + - /bin/sh + - /src/o2app-mock-smo.sh + ports: + - "5001:80" + postgres: image: postgres:9.6 environment: @@ -105,3 +130,8 @@ services: image: redis:alpine ports: - "63791:6379" + + mock_smo_redis: + image: redis:alpine + ports: + - "63792:6379" diff --git a/mock_smo/Dockerfile b/mock_smo/Dockerfile new file mode 100644 index 0000000..74d6b61 --- /dev/null +++ b/mock_smo/Dockerfile @@ -0,0 +1,23 @@ +FROM python:3.10-slim-buster + +RUN apt-get update; apt-get install -y git gcc + +COPY requirements.txt /tmp/ +RUN pip install -r /tmp/requirements.txt + +# COPY requirements-test.txt /tmp/ +# RUN pip install -r /tmp/requirements-test.txt + +RUN mkdir -p /src +COPY mock_smo/ /src/mock_smo/ + +COPY setup.py o2app-mock-smo.sh /src/ +RUN pip install -e /src + +COPY etc/ /etc/mock_smo/ + +# COPY tests/ /tests/ + +# RUN apt-get install -y procps vim + +WORKDIR /src diff --git a/mock_smo/etc/log.yaml b/mock_smo/etc/log.yaml new file mode 100644 index 0000000..59ab8eb --- /dev/null +++ b/mock_smo/etc/log.yaml @@ -0,0 +1,49 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: 1 +disable_existing_loggers: False + +loggers: + root: + handlers: [console_handler, file_handler] + level: "WARNING" + propagate: False + o2common: + handlers: [console_handler, file_handler] + level: "WARNING" + propagate: False + o2ims: + handlers: [console_handler, file_handler] + level: "DEBUG" + propagate: False + o2dms: + handlers: [console_handler, file_handler] + level: "DEBUG" + propagate: False +handlers: + console_handler: + level: "DEBUG" + class: "logging.StreamHandler" + formatter: "standard" + file_handler: + level: "DEBUG" + class: "logging.handlers.RotatingFileHandler" + filename: "/var/log/mock_smo.log" + formatter: "standard" + maxBytes: 52428800 + backupCount: 10 +formatters: + standard: + format: "%(asctime)s:[%(name)s]:[%(filename)s]-[%(lineno)d] [%(levelname)s]:%(message)s" diff --git a/mock_smo/mock_smo/__init__.py b/mock_smo/mock_smo/__init__.py new file mode 100644 index 0000000..b514342 --- /dev/null +++ b/mock_smo/mock_smo/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/mock_smo/mock_smo/config.py b/mock_smo/mock_smo/config.py new file mode 100644 index 0000000..ab5d607 --- /dev/null +++ b/mock_smo/mock_smo/config.py @@ -0,0 +1,48 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import mock_smo.logging as logging +logger = logging.get_logger(__name__) + + +def get_mock_smo_api_url(): + host = os.environ.get("API_HOST", "localhost") + port = 5001 if host == "localhost" else 80 + return f"http://{host}:{port}" + + +def get_root_api_base(): + return "/" + + +def get_o2ims_api_base(): + return get_root_api_base() + 'o2ims_infrastructureInventory/v1' + + +def get_o2dms_api_base(): + return get_root_api_base() + "o2dms" + + +def get_redis_host_and_port(): + host = os.environ.get("REDIS_HOST", "localhost") + port = 63792 if host == "localhost" else 6379 + return dict(host=host, port=port) + + +def get_smo_o2endpoint(): + smo_o2endpoint = os.environ.get( + "SMO_O2_ENDPOINT", "http://localhost/smo_sim") + return smo_o2endpoint diff --git a/mock_smo/mock_smo/entrypoints/__init__.py b/mock_smo/mock_smo/entrypoints/__init__.py new file mode 100644 index 0000000..b514342 --- /dev/null +++ b/mock_smo/mock_smo/entrypoints/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/mock_smo/mock_smo/entrypoints/mock_smo.py b/mock_smo/mock_smo/entrypoints/mock_smo.py new file mode 100644 index 0000000..f5c5895 --- /dev/null +++ b/mock_smo/mock_smo/entrypoints/mock_smo.py @@ -0,0 +1,116 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import redis +import http.client +from flask import Flask, request +from flask.helpers import url_for + +import mock_smo.config as config +import mock_smo.logging as logging +logger = logging.get_logger(__name__) + +apibase = config.get_o2ims_api_base() +app = Flask(__name__) + +r = redis.Redis(**config.get_redis_host_and_port()) +REDIS_SUB_KEY = 'mock_smo_sub_key' +REDIS_O2IMS_URL = 'mock_smo_o2ims_url' + + +@app.route('/', methods=['GET', 'POST']) +def index(): + if request.method == 'POST': + url = request.form['url'] + consumerSubscriptionId = request.form['consumerSubId'] + sub_id = subscription_ims(url, consumerSubscriptionId) + return """ +

Subscribed O2IMS

+

Subscription ID: %s

+

Subscribed O2IMS URL: %s

+ + + +""" % (sub_id, url, url_for('unsubscription')) + return """ +

Subscribe O2IMS

+
+ +

+ +

+ +
+""" + + +@app.route('/unsubscription') +def unsubscription(): + sub_key = r.get(REDIS_SUB_KEY) + logger.info('Subscription key is {}'.format(sub_key)) + if sub_key is None: + return '

Already unsubscribed

' + url = r.get(REDIS_O2IMS_URL).decode('utf-8') + logger.info('O2 IMS API is: {}'.format(url)) + unsubscription_ims(url, sub_key.decode('utf-8')) + r.delete(REDIS_O2IMS_URL) + r.delete(REDIS_SUB_KEY) + return """ +

Unsubscribed O2IMS

+ + + +""" + + +@app.route('/callback', methods=['POST']) +def callback(): + logger.info('Callback data: {}'.format(request.get_data())) + return '', 202 + + +def subscription_ims(url, consumerSubscriptionId): + sub_key = r.get(REDIS_SUB_KEY) + logger.info('Subscription key is {}'.format(sub_key)) + if sub_key is not None: + return sub_key.decode('utf-8') + + logger.info(request.host_url) + conn = http.client.HTTPConnection(url) + headers = {'Content-type': 'application/json'} + post_val = { + 'callback': 'http://mock_smo:80' + url_for('callback'), + 'consumerSubscriptionId': consumerSubscriptionId, + 'filter': '["pserver"]' # '["pserver","pserver_mem"]' + } + json_val = json.dumps(post_val) + conn.request('POST', apibase+'/subscriptions', json_val, headers) + resp = conn.getresponse() + data = resp.read().decode('utf-8') + logger.info('Subscription response: {} {}, data: {}'.format( + resp.status, resp.reason, data)) + json_data = json.loads(data) + + r.set(REDIS_SUB_KEY, json_data['subscriptionId']) + r.set(REDIS_O2IMS_URL, url) + return json_data['subscriptionId'] + + +def unsubscription_ims(url, subId): + conn = http.client.HTTPConnection(url) + conn.request('DELETE', apibase + '/subscriptions/' + subId) + resp = conn.getresponse() + logger.info('Unsubscription response: {} {}'.format( + resp.status, resp.reason)) diff --git a/mock_smo/mock_smo/logging.py b/mock_smo/mock_smo/logging.py new file mode 100644 index 0000000..cb3b504 --- /dev/null +++ b/mock_smo/mock_smo/logging.py @@ -0,0 +1,36 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import logging.config +import logging.handlers +import os +import yaml + + +def get_logger(name=None): + CONFIG_FILE = os.environ.get( + "LOGGING_CONFIG_FILE", "/etc/mock_smo/log.yaml") + if os.path.exists(CONFIG_FILE): + with open(file=CONFIG_FILE, mode='r', encoding="utf-8") as file: + config_yaml = yaml.load(stream=file, Loader=yaml.FullLoader) + logging.config.dictConfig(config=config_yaml) + + logger = logging.getLogger(name) + + # override logging level + LOGGING_CONFIG_LEVEL = os.environ.get("LOGGING_CONFIG_LEVEL", None) + if LOGGING_CONFIG_LEVEL: + logger.setLevel(LOGGING_CONFIG_LEVEL) + return logger diff --git a/mock_smo/o2app-mock-smo.sh b/mock_smo/o2app-mock-smo.sh new file mode 100644 index 0000000..137491b --- /dev/null +++ b/mock_smo/o2app-mock-smo.sh @@ -0,0 +1,13 @@ +#!/bin/sh + +mkdir -p /etc/mock_smo +cp -r /tmp/etc/* /etc/mock_smo/ +mkdir -p /src/mock_smo +cp -r /mock_smo/* /src/mock_smo + +# cp -r requirements.txt /src/requirements.txt + +pip install -e /src + +export FLASK_APP=/mock_smo/entrypoints/mock_smo.py +flask run --host=0.0.0.0 --port=80 diff --git a/mock_smo/requirements.txt b/mock_smo/requirements.txt new file mode 100644 index 0000000..0b2acc6 --- /dev/null +++ b/mock_smo/requirements.txt @@ -0,0 +1,5 @@ +flask +flask-restx +redis +PyYAML>=5.4.1 + diff --git a/mock_smo/setup.py b/mock_smo/setup.py new file mode 100644 index 0000000..3afc7fc --- /dev/null +++ b/mock_smo/setup.py @@ -0,0 +1,13 @@ +from setuptools import setup +from setuptools import find_packages + +setup( + name="mock_smo", + version="1.0", + packages=find_packages(), + license="LICENSE", + description="Mock SMO server for O2 IMS and DMS", + install_requires=[ + 'httplib2', + ] +) diff --git a/o2app/adapter/unit_of_work.py b/o2app/adapter/unit_of_work.py index 6fd4ae2..facfc45 100644 --- a/o2app/adapter/unit_of_work.py +++ b/o2app/adapter/unit_of_work.py @@ -73,13 +73,14 @@ class SqlAlchemyUnitOfWork(AbstractUnitOfWork): def _collect_new_events(self): for entry in self.oclouds.seen: - while hasattr(entry, 'events') and len(entry.events) > 0: + # while hasattr(entry, 'events') and len(entry.events) > 0: + while entry.events is not None and len(entry.events) > 0: yield entry.events.pop(0) for entry in self.resource_pools.seen: - while hasattr(entry, 'events') and len(entry.events) > 0: + while entry.events is not None and len(entry.events) > 0: yield entry.events.pop(0) for entry in self.resources.seen: - while hasattr(entry, 'events') and len(entry.events) > 0: + while entry.events is not None and len(entry.events) > 0: yield entry.events.pop(0) for entry in self.resource_types.seen: while hasattr(entry, 'events') and len(entry.events) > 0: diff --git a/o2app/entrypoints/redis_eventconsumer.py b/o2app/entrypoints/redis_eventconsumer.py index 8472949..ea49edd 100644 --- a/o2app/entrypoints/redis_eventconsumer.py +++ b/o2app/entrypoints/redis_eventconsumer.py @@ -13,26 +13,29 @@ # limitations under the License. # import json -from logging import log import redis import json from o2app import bootstrap from o2common.config import config # from o2common.domain import commands from o2dms.domain import commands -from o2dms.domain import events +from o2ims.domain import commands as imscmd from o2common.helper import o2logging +from o2ims.domain.subscription_obj import Message2SMO logger = o2logging.get_logger(__name__) r = redis.Redis(**config.get_redis_host_and_port()) +apibase = config.get_o2ims_api_base() + def main(): logger.info("Redis pubsub starting") bus = bootstrap.bootstrap() pubsub = r.pubsub(ignore_subscribe_messages=True) pubsub.subscribe("NfDeploymentStateChanged") + pubsub.subscribe('ResourceChanged') for m in pubsub.listen(): try: @@ -50,11 +53,22 @@ def handle_dms_changed(m, bus): data = json.loads(datastr) logger.info('HandleNfDeploymentStateChanged with cmd:{}'.format(data)) cmd = commands.HandleNfDeploymentStateChanged( - NfDeploymentId = data['NfDeploymentId'], - FromState = data['FromState'], - ToState = data['ToState'] + NfDeploymentId=data['NfDeploymentId'], + FromState=data['FromState'], + ToState=data['ToState'] ) bus.handle(cmd) + if channel == 'ResourceChanged': + datastr = m['data'] + data = json.loads(datastr) + logger.info('ResourceChanged with cmd:{}'.format(data)) + ref = apibase + '/resourcePools/' + data['resourcePoolId'] +\ + '/resources/' + data['id'] + cmd = imscmd.PubMessage2SMO(data=Message2SMO( + id=data['id'], ref=ref, + eventtype=data['notificationEventType'], + updatetime=data['updatetime'])) + bus.handle(cmd) else: logger.info("unhandled:{}".format(channel)) diff --git a/o2app/entrypoints/resource_watcher.py b/o2app/entrypoints/resource_watcher.py index a7bf4b3..38308eb 100644 --- a/o2app/entrypoints/resource_watcher.py +++ b/o2app/entrypoints/resource_watcher.py @@ -54,7 +54,7 @@ class WatcherService(cotyledon.Service): super().__init__(worker_id) self.args = args self.bus = bootstrap.bootstrap() - self.worker = PollWorker() + self.worker = PollWorker(bus=self.bus) def run(self): try: diff --git a/o2app/service/handlers.py b/o2app/service/handlers.py index ebbf9c0..0665cf2 100644 --- a/o2app/service/handlers.py +++ b/o2app/service/handlers.py @@ -27,6 +27,8 @@ from o2ims.service.auditor import ocloud_handler, dms_handler, \ resourcepool_handler, pserver_handler, pserver_cpu_handler, \ pserver_mem_handler, pserver_port_handler, pserver_if_handler,\ pserver_eth_handler +from o2ims.service.event import notify_handler, ocloud_event, \ + resource_event, resource_pool_event # if TYPE_CHECKING: # from . import unit_of_work @@ -39,7 +41,7 @@ class InvalidResourceType(Exception): EVENT_HANDLERS = { o2dms_events.NfDeploymentStateChanged: [ nfdeployment_handler.publish_nfdeployment_state_change - ] + ], # o2dms_events.NfDeploymentCreated: [ # nfdeployment_handler.publish_nfdeployment_created], # o2dms_events.NfDeploymentInstalled: [ @@ -48,7 +50,11 @@ EVENT_HANDLERS = { # nfdeployment_handler.publish_nfdeployment_uninstalling], # o2dms_events.NfDeploymentUninstalled: [ # nfdeployment_handler.publish_nfdeployment_uninstalled] -} # type: Dict[Type[events.Event], Callable] + events.OcloudChanged: [ocloud_event.notify_ocloud_update], + events.ResourceChanged: [resource_event.notify_resource_change], + events.ResourcePoolChanged: [resource_pool_event.\ + notify_resourcepool_change], +} # type: Dict[Type[events.Event], Callable] COMMAND_HANDLERS = { @@ -61,13 +67,13 @@ COMMAND_HANDLERS = { commands.UpdatePserverIf: pserver_if_handler.update_pserver_if, commands.UpdatePserverIfPort: pserver_port_handler.update_pserver_port, commands.UpdatePserverEth: pserver_eth_handler.update_pserver_eth, - - o2dms_cmmands.HandleNfDeploymentStateChanged: \ - nfdeployment_handler.handle_nfdeployment_statechanged, - o2dms_cmmands.InstallNfDeployment: \ - nfdeployment_handler.install_nfdeployment, - o2dms_cmmands.UninstallNfDeployment: \ - nfdeployment_handler.uninstall_nfdeployment, - o2dms_cmmands.DeleteNfDeployment: \ - nfdeployment_handler.delete_nfdeployment, + o2dms_cmmands.HandleNfDeploymentStateChanged: + nfdeployment_handler.handle_nfdeployment_statechanged, + o2dms_cmmands.InstallNfDeployment: + nfdeployment_handler.install_nfdeployment, + o2dms_cmmands.UninstallNfDeployment: + nfdeployment_handler.uninstall_nfdeployment, + o2dms_cmmands.DeleteNfDeployment: + nfdeployment_handler.delete_nfdeployment, + commands.PubMessage2SMO: notify_handler.notify_change_to_smo, } # type: Dict[Type[commands.Command], Callable] diff --git a/o2common/adapter/redis_eventpublisher.py b/o2common/adapter/redis_eventpublisher.py index 9fac313..e128ef7 100644 --- a/o2common/adapter/redis_eventpublisher.py +++ b/o2common/adapter/redis_eventpublisher.py @@ -28,4 +28,4 @@ r = redis.Redis(**config.get_redis_host_and_port()) def publish(channel, event: events.Event): logger.info("publishing: channel=%s, event=%s", channel, event) - r.publish(channel, json.dumps(asdict(event))) + r.publish(channel, json.dumps(asdict(event), default=str)) diff --git a/o2common/domain/base.py b/o2common/domain/base.py index 63d1659..130fdad 100644 --- a/o2common/domain/base.py +++ b/o2common/domain/base.py @@ -15,11 +15,14 @@ from datetime import datetime from typing import List from sqlalchemy.inspection import inspect +from sqlalchemy.exc import NoInspectionAvailable from .events import Event class AgRoot: + events = [] + def __init__(self) -> None: self.hash = "" self.updatetime = datetime.now() @@ -27,17 +30,23 @@ class AgRoot: self.events = [] # type: List[Event] # self.id = "" + # def append_event(self, event: Event): + # self.events = self.events.append(event) + class Serializer(object): def serialize(self): - # d = {c: getattr(self, c) for c in inspect(self).attrs.keys()} - # if 'createtime' in d: - # d['createtime'] = d['createtime'].isoformat() - # if 'updatetime' in d: - # d['updatetime'] = d['updatetime'].isoformat() - # return d - return {c: getattr(self, c) for c in inspect(self).attrs.keys()} + try: + # d = {c: getattr(self, c) for c in inspect(self).attrs.keys()} + # if 'createtime' in d: + # d['createtime'] = d['createtime'].isoformat() + # if 'updatetime' in d: + # d['updatetime'] = d['updatetime'].isoformat() + # return d + return {c: getattr(self, c) for c in inspect(self).attrs.keys()} + except NoInspectionAvailable: + return self.__dict__ @staticmethod def serialize_list(li): diff --git a/o2common/service/watcher/worker.py b/o2common/service/watcher/worker.py index 64d189d..dbc67b7 100644 --- a/o2common/service/watcher/worker.py +++ b/o2common/service/watcher/worker.py @@ -14,6 +14,7 @@ import time import sched +# from o2common.service.unit_of_work import AbstractUnitOfWork from o2common.service.watcher.base import WatcherTree from o2common.helper import o2logging @@ -21,12 +22,13 @@ logger = o2logging.get_logger(__name__) class PollWorker(object): - def __init__(self, interval=10) -> None: + def __init__(self, interval=10, bus=None) -> None: super().__init__() self.watchers = [] self.schedinstance = sched.scheduler(time.time, time.sleep) self.schedinterval = interval self._stopped = True + self._bus = bus def set_interval(self, interval): if interval > 0: @@ -48,6 +50,13 @@ class PollWorker(object): except Exception as ex: logger.warning("Worker raises exception:" + str(ex)) continue + + # handle events + if self._bus is not None: + events = self._bus.uow.collect_new_events() + for event in events: + self._bus.handle(event) + self.schedinstance.enter(self.schedinterval, 1, self._repeat) # note the sched run will block current thread diff --git a/o2ims/adapter/clients/ocloud_sa_client.py b/o2ims/adapter/clients/ocloud_sa_client.py index 97592d3..d0e12de 100644 --- a/o2ims/adapter/clients/ocloud_sa_client.py +++ b/o2ims/adapter/clients/ocloud_sa_client.py @@ -161,7 +161,7 @@ class StxSaClientImp(object): logger.debug('host 1:' + str(hosts[0].to_dict())) return [ocloudModel.StxGenericModel( ResourceTypeEnum.PSERVER, self._hostconverter(host)) - for host in hosts if host] + for host in hosts if host and host.availability == 'available'] def getPserver(self, id) -> ocloudModel.StxGenericModel: host = self.stxclient.ihost.get(id) diff --git a/o2ims/adapter/ocloud_repository.py b/o2ims/adapter/ocloud_repository.py index ab1d5c4..4880fa1 100644 --- a/o2ims/adapter/ocloud_repository.py +++ b/o2ims/adapter/ocloud_repository.py @@ -14,10 +14,10 @@ from typing import List -from o2ims.domain import ocloud +from o2ims.domain import ocloud, subscription_obj from o2ims.domain.ocloud_repo import OcloudRepository, ResourceTypeRepository,\ - ResourcePoolRepository, ResourceRepository, DeploymentManagerRepository,\ - SubscriptionRepository + ResourcePoolRepository, ResourceRepository, DeploymentManagerRepository +from o2ims.domain.subscription_repo import SubscriptionRepository from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -143,19 +143,19 @@ class SubscriptionSqlAlchemyRepository(SubscriptionRepository): super().__init__() self.session = session - def _add(self, subscription: ocloud.Subscription): + def _add(self, subscription: subscription_obj.Subscription): self.session.add(subscription) - def _get(self, subscription_id) -> ocloud.Subscription: - return self.session.query(ocloud.Subscription).filter_by( + def _get(self, subscription_id) -> subscription_obj.Subscription: + return self.session.query(subscription_obj.Subscription).filter_by( subscriptionId=subscription_id).first() - def _list(self) -> List[ocloud.Subscription]: - return self.session.query(ocloud.Subscription) + def _list(self) -> List[subscription_obj.Subscription]: + return self.session.query(subscription_obj.Subscription) - def _update(self, subscription: ocloud.Subscription): + def _update(self, subscription: subscription_obj.Subscription): self.session.add(subscription) def _delete(self, subscription_id): - self.session.query(ocloud.Subscription).filter_by( + self.session.query(subscription_obj.Subscription).filter_by( subscriptionId=subscription_id).delete() diff --git a/o2ims/adapter/orm.py b/o2ims/adapter/orm.py index 7ad7a20..bb5c984 100644 --- a/o2ims/adapter/orm.py +++ b/o2ims/adapter/orm.py @@ -32,6 +32,7 @@ from sqlalchemy.orm import mapper, relationship # from sqlalchemy.sql.sqltypes import Integer from o2ims.domain import ocloud as ocloudModel +from o2ims.domain import subscription_obj as subModel from o2ims.domain.resource_type import ResourceTypeEnum from o2common.helper import o2logging @@ -166,7 +167,7 @@ def start_o2ims_mappers(engine=None): "resourcePools": relationship(resourcepool_mapper) } ) - mapper(ocloudModel.Subscription, subscription) + mapper(subModel.Subscription, subscription) if engine is not None: metadata.create_all(engine) diff --git a/o2ims/domain/commands.py b/o2ims/domain/commands.py index 657b48f..2b2aca6 100644 --- a/o2ims/domain/commands.py +++ b/o2ims/domain/commands.py @@ -19,6 +19,7 @@ from dataclasses import dataclass # from datetime import datetime # from o2ims.domain.resource_type import ResourceTypeEnum from o2ims.domain.stx_object import StxGenericModel +from o2ims.domain.subscription_obj import Message2SMO from o2common.domain.commands import Command @@ -27,6 +28,11 @@ class UpdateStxObject(Command): data: StxGenericModel +@dataclass +class PubMessage2SMO(Command): + data: Message2SMO + + @dataclass class UpdateOCloud(UpdateStxObject): pass diff --git a/o2ims/domain/events.py b/o2ims/domain/events.py index 7ab04a0..a4a2375 100644 --- a/o2ims/domain/events.py +++ b/o2ims/domain/events.py @@ -14,14 +14,34 @@ # pylint: disable=too-few-public-methods from dataclasses import dataclass +from datetime import datetime from o2common.domain.events import Event +from o2ims.domain.subscription_obj import NotificationEventEnum @dataclass -class OcloudUpdated(Event): - oCloudId: str +class OcloudChanged(Event): + id: str + notificationEventType: NotificationEventEnum + updatetime: datetime.now() @dataclass -class ResourceTypeUpdated(Event): - oCloudId: str +class ResourceTypeChanged(Event): + id: str + updatetime: datetime.now() + + +@dataclass +class ResourcePoolChanged(Event): + id: str + notificationEventType: NotificationEventEnum + updatetime: datetime.now() + + +@dataclass +class ResourceChanged(Event): + id: str + resourcePoolId: str + notificationEventType: NotificationEventEnum + updatetime: datetime.now() diff --git a/o2ims/domain/ocloud.py b/o2ims/domain/ocloud.py index e3584ed..1323134 100644 --- a/o2ims/domain/ocloud.py +++ b/o2ims/domain/ocloud.py @@ -23,17 +23,6 @@ from .resource_type import ResourceTypeEnum # from uuid import UUID -class Subscription(AgRoot, Serializer): - def __init__(self, id: str, callback: str, consumersubid: str = '', - filter: str = '') -> None: - super().__init__() - self.subscriptionId = id - self.version_number = 0 - self.callback = callback - self.consumerSubscriptionId = consumersubid - self.filter = filter - - class DeploymentManager(AgRoot, Serializer): def __init__(self, id: str, name: str, ocloudid: str, dmsendpoint: str, description: str = '', diff --git a/o2ims/domain/ocloud_repo.py b/o2ims/domain/ocloud_repo.py index b224a68..c513eed 100644 --- a/o2ims/domain/ocloud_repo.py +++ b/o2ims/domain/ocloud_repo.py @@ -187,43 +187,3 @@ class DeploymentManagerRepository(abc.ABC): @abc.abstractmethod def _update(self, deployment_manager: ocloud.DeploymentManager): raise NotImplementedError - - -class SubscriptionRepository(abc.ABC): - def __init__(self): - self.seen = set() # type: Set[ocloud.Subscription] - - def add(self, subscription: ocloud.Subscription): - self._add(subscription) - self.seen.add(subscription) - - def get(self, subscription_id) -> ocloud.Subscription: - subscription = self._get(subscription_id) - if subscription: - self.seen.add(subscription) - return subscription - - def list(self) -> List[ocloud.Subscription]: - return self._list() - - def update(self, subscription: ocloud.Subscription): - self._update(subscription) - - def delete(self, subscription_id): - self._delete(subscription_id) - - @abc.abstractmethod - def _add(self, subscription: ocloud.Subscription): - raise NotImplementedError - - @abc.abstractmethod - def _get(self, subscription_id) -> ocloud.Subscription: - raise NotImplementedError - - @abc.abstractmethod - def _update(self, subscription: ocloud.Subscription): - raise NotImplementedError - - @abc.abstractmethod - def _delete(self, subscription_id): - raise NotImplementedError diff --git a/o2ims/domain/subscription_obj.py b/o2ims/domain/subscription_obj.py new file mode 100644 index 0000000..dc3145f --- /dev/null +++ b/o2ims/domain/subscription_obj.py @@ -0,0 +1,57 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations +from enum import Enum +from dataclasses import dataclass + +from o2common.domain.base import AgRoot, Serializer + + +class Subscription(AgRoot, Serializer): + def __init__(self, id: str, callback: str, consumersubid: str = '', + filter: str = '') -> None: + super().__init__() + self.subscriptionId = id + self.version_number = 0 + self.callback = callback + self.consumerSubscriptionId = consumersubid + self.filter = filter + + +class NotificationEventEnum(str, Enum): + CREATE = 'CREATE' + MODIFY = 'MODIFY' + DELETE = 'DELETE' + + +class Message2SMO(Serializer): + def __init__(self, eventtype: NotificationEventEnum, + id: str, ref: str, updatetime: str) -> None: + self.notificationEventType = eventtype + self.objectRef = ref + self.id = id + self.updatetime = updatetime + + +@dataclass +class EventState(): + Initial = 0 + NotInstalled = 1 + Installing = 2 + Installed = 3 + Updating = 4 + Uninstalling = 5 + Abnormal = 6 + Deleted = 7 diff --git a/o2ims/domain/subscription_repo.py b/o2ims/domain/subscription_repo.py new file mode 100644 index 0000000..d12c00d --- /dev/null +++ b/o2ims/domain/subscription_repo.py @@ -0,0 +1,57 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from typing import List, Set +from o2ims.domain import subscription_obj as subobj + + +class SubscriptionRepository(abc.ABC): + def __init__(self): + self.seen = set() # type: Set[subobj.Subscription] + + def add(self, subscription: subobj.Subscription): + self._add(subscription) + self.seen.add(subscription) + + def get(self, subscription_id) -> subobj.Subscription: + subscription = self._get(subscription_id) + if subscription: + self.seen.add(subscription) + return subscription + + def list(self) -> List[subobj.Subscription]: + return self._list() + + def update(self, subscription: subobj.Subscription): + self._update(subscription) + + def delete(self, subscription_id): + self._delete(subscription_id) + + @abc.abstractmethod + def _add(self, subscription: subobj.Subscription): + raise NotImplementedError + + @abc.abstractmethod + def _get(self, subscription_id) -> subobj.Subscription: + raise NotImplementedError + + @abc.abstractmethod + def _update(self, subscription: subobj.Subscription): + raise NotImplementedError + + @abc.abstractmethod + def _delete(self, subscription_id): + raise NotImplementedError diff --git a/o2ims/service/auditor/ocloud_handler.py b/o2ims/service/auditor/ocloud_handler.py index c0d8eaf..97cca3e 100644 --- a/o2ims/service/auditor/ocloud_handler.py +++ b/o2ims/service/auditor/ocloud_handler.py @@ -14,17 +14,20 @@ # pylint: disable=unused-argument from __future__ import annotations +from typing import Callable -from o2ims.domain.stx_object import StxGenericModel # from dataclasses import asdict # from typing import List, Dict, Callable, Type # TYPE_CHECKING -from o2ims.domain import commands + +from o2common.config import config +# from o2common.service.messagebus import MessageBus from o2common.service.unit_of_work import AbstractUnitOfWork -from o2ims.domain.resource_type import InvalidOcloudState -from o2ims.domain.resource_type import MismatchedModel +from o2ims.domain import events, commands from o2ims.domain.ocloud import Ocloud -from o2common.config import config +from o2ims.domain.stx_object import StxGenericModel +from o2ims.domain.resource_type import InvalidOcloudState, MismatchedModel +from o2ims.domain.subscription_obj import NotificationEventEnum # if TYPE_CHECKING: # from . import unit_of_work @@ -38,7 +41,8 @@ class InvalidResourceType(Exception): def update_ocloud( cmd: commands.UpdateOCloud, - uow: AbstractUnitOfWork + uow: AbstractUnitOfWork, + publish: Callable ): stxobj = cmd.data with uow: @@ -99,4 +103,8 @@ def update_by(ocloud: Ocloud, stxobj: StxGenericModel) -> None: # ocloud.content = stxobj.content ocloud.hash = stxobj.hash ocloud.version_number = ocloud.version_number + 1 - ocloud.events = [] + ocloud.events.append(events.OcloudChanged( + id=stxobj.id, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/auditor/pserver_cpu_handler.py b/o2ims/service/auditor/pserver_cpu_handler.py index 76141bc..cac4690 100644 --- a/o2ims/service/auditor/pserver_cpu_handler.py +++ b/o2ims/service/auditor/pserver_cpu_handler.py @@ -17,11 +17,12 @@ from __future__ import annotations import uuid # import json -from o2ims.domain import commands +from o2ims.domain import commands, events from o2ims.domain.stx_object import StxGenericModel from o2common.service.unit_of_work import AbstractUnitOfWork from o2ims.domain.resource_type import MismatchedModel from o2ims.domain.ocloud import Resource, ResourceType +from o2ims.domain.subscription_obj import NotificationEventEnum from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -114,4 +115,9 @@ def update_by(target: Resource, stxobj: StxGenericModel, target.updatetime = stxobj.updatetime target.hash = stxobj.hash target.version_number = target.version_number + 1 - target.events = [] + target.events.append(events.ResourceChanged( + id=stxobj.id, + resourcePoolId=target.resourcePoolId, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/auditor/pserver_eth_handler.py b/o2ims/service/auditor/pserver_eth_handler.py index e65440a..5f6c341 100644 --- a/o2ims/service/auditor/pserver_eth_handler.py +++ b/o2ims/service/auditor/pserver_eth_handler.py @@ -17,11 +17,12 @@ from __future__ import annotations import uuid # import json -from o2ims.domain import commands +from o2ims.domain import commands, events from o2ims.domain.stx_object import StxGenericModel from o2common.service.unit_of_work import AbstractUnitOfWork from o2ims.domain.resource_type import MismatchedModel from o2ims.domain.ocloud import Resource, ResourceType +from o2ims.domain.subscription_obj import NotificationEventEnum from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -114,4 +115,9 @@ def update_by(target: Resource, stxobj: StxGenericModel, target.updatetime = stxobj.updatetime target.hash = stxobj.hash target.version_number = target.version_number + 1 - target.events = [] + target.events.append(events.ResourceChanged( + id=stxobj.id, + resourcePoolId=target.resourcePoolId, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/auditor/pserver_handler.py b/o2ims/service/auditor/pserver_handler.py index 63b0534..f5df381 100644 --- a/o2ims/service/auditor/pserver_handler.py +++ b/o2ims/service/auditor/pserver_handler.py @@ -16,9 +16,11 @@ from __future__ import annotations import uuid # import json +from typing import Callable -from o2ims.domain import commands +from o2ims.domain import commands, events from o2ims.domain.stx_object import StxGenericModel +from o2ims.domain.subscription_obj import NotificationEventEnum from o2common.service.unit_of_work import AbstractUnitOfWork from o2ims.domain.resource_type import MismatchedModel from o2ims.domain.ocloud import Resource, ResourceType @@ -33,7 +35,8 @@ class InvalidResourceType(Exception): def update_pserver( cmd: commands.UpdatePserver, - uow: AbstractUnitOfWork + uow: AbstractUnitOfWork, + publish: Callable ): stxobj = cmd.data with uow: @@ -115,4 +118,9 @@ def update_by(target: Resource, stxobj: StxGenericModel, target.updatetime = stxobj.updatetime target.hash = stxobj.hash target.version_number = target.version_number + 1 - target.events = [] + target.events.append(events.ResourceChanged( + id=stxobj.id, + resourcePoolId=target.resourcePoolId, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/auditor/pserver_if_handler.py b/o2ims/service/auditor/pserver_if_handler.py index 7ee2df2..cd4e680 100644 --- a/o2ims/service/auditor/pserver_if_handler.py +++ b/o2ims/service/auditor/pserver_if_handler.py @@ -17,11 +17,12 @@ from __future__ import annotations import uuid # import json -from o2ims.domain import commands +from o2ims.domain import commands, events from o2ims.domain.stx_object import StxGenericModel from o2common.service.unit_of_work import AbstractUnitOfWork from o2ims.domain.resource_type import MismatchedModel from o2ims.domain.ocloud import Resource, ResourceType +from o2ims.domain.subscription_obj import NotificationEventEnum from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -114,4 +115,9 @@ def update_by(target: Resource, stxobj: StxGenericModel, target.updatetime = stxobj.updatetime target.hash = stxobj.hash target.version_number = target.version_number + 1 - target.events = [] + target.events.append(events.ResourceChanged( + id=stxobj.id, + resourcePoolId=target.resourcePoolId, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/auditor/pserver_mem_handler.py b/o2ims/service/auditor/pserver_mem_handler.py index a07072b..683475e 100644 --- a/o2ims/service/auditor/pserver_mem_handler.py +++ b/o2ims/service/auditor/pserver_mem_handler.py @@ -17,8 +17,9 @@ from __future__ import annotations import uuid # import json -from o2ims.domain import commands +from o2ims.domain import commands, events from o2ims.domain.stx_object import StxGenericModel +from o2ims.domain.subscription_obj import NotificationEventEnum from o2common.service.unit_of_work import AbstractUnitOfWork from o2ims.domain.resource_type import MismatchedModel from o2ims.domain.ocloud import Resource, ResourceType @@ -114,4 +115,9 @@ def update_by(target: Resource, stxobj: StxGenericModel, target.updatetime = stxobj.updatetime target.hash = stxobj.hash target.version_number = target.version_number + 1 - target.events = [] + target.events.append(events.ResourceChanged( + id=stxobj.id, + resourcePoolId=target.resourcePoolId, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/auditor/pserver_port_handler.py b/o2ims/service/auditor/pserver_port_handler.py index 41bed8e..bfb2e1d 100644 --- a/o2ims/service/auditor/pserver_port_handler.py +++ b/o2ims/service/auditor/pserver_port_handler.py @@ -17,11 +17,12 @@ from __future__ import annotations import uuid # import json -from o2ims.domain import commands +from o2ims.domain import commands, events from o2ims.domain.stx_object import StxGenericModel from o2common.service.unit_of_work import AbstractUnitOfWork from o2ims.domain.resource_type import MismatchedModel from o2ims.domain.ocloud import Resource, ResourceType +from o2ims.domain.subscription_obj import NotificationEventEnum from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -114,4 +115,9 @@ def update_by(target: Resource, stxobj: StxGenericModel, target.updatetime = stxobj.updatetime target.hash = stxobj.hash target.version_number = target.version_number + 1 - target.events = [] + target.events.append(events.ResourceChanged( + id=stxobj.id, + resourcePoolId=target.resourcePoolId, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/auditor/resourcepool_handler.py b/o2ims/service/auditor/resourcepool_handler.py index 88cf182..b5b0e0d 100644 --- a/o2ims/service/auditor/resourcepool_handler.py +++ b/o2ims/service/auditor/resourcepool_handler.py @@ -20,11 +20,12 @@ from o2ims.domain.stx_object import StxGenericModel # from dataclasses import asdict # from typing import List, Dict, Callable, Type # TYPE_CHECKING -from o2ims.domain import commands +from o2ims.domain import commands, events from o2common.service.unit_of_work import AbstractUnitOfWork # from o2ims.domain.resource_type import InvalidOcloudState from o2ims.domain.resource_type import MismatchedModel from o2ims.domain.ocloud import ResourcePool +from o2ims.domain.subscription_obj import NotificationEventEnum # if TYPE_CHECKING: # from . import unit_of_work @@ -98,4 +99,8 @@ def update_by(target: ResourcePool, stxobj: StxGenericModel, target.hash = stxobj.hash target.oCloudId = parentid target.version_number = target.version_number + 1 - target.events = [] + target.events.append(events.ResourcePoolChanged( + id=stxobj.id, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/event/__init__.py b/o2ims/service/event/__init__.py new file mode 100644 index 0000000..b514342 --- /dev/null +++ b/o2ims/service/event/__init__.py @@ -0,0 +1,13 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/o2ims/service/event/notify_handler.py b/o2ims/service/event/notify_handler.py new file mode 100644 index 0000000..3ece84a --- /dev/null +++ b/o2ims/service/event/notify_handler.py @@ -0,0 +1,99 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import redis +# import requests +import http.client +from urllib.parse import urlparse + +from o2common.config import config +from o2common.service.unit_of_work import AbstractUnitOfWork +from o2ims.domain import commands +from o2ims.domain.subscription_obj import Subscription, Message2SMO + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + +# Maybe another MQ server +r = redis.Redis(**config.get_redis_host_and_port()) + + +def notify_change_to_smo( + cmd: commands.PubMessage2SMO, + uow: AbstractUnitOfWork, +): + logger.info('In notify_change_to_smo') + data = cmd.data + with uow: + subs = uow.subscriptions.list() + for sub in subs: + sub_data = sub.serialize() + logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) + + try: + resource_filter = json.loads(sub_data['filter']) + if len(resource_filter) > 0: + resource = uow.resources.get(data.id) + logger.debug(type(resource)) + if resource: # TODO deal with resource is empty + res_type_id = resource.serialize()['resourceTypeId'] + resourcetype = uow.resource_types.get(res_type_id) + logger.debug(resourcetype.name) + if resourcetype.name not in resource_filter: + continue + except json.decoder.JSONDecodeError as err: + logger.warning( + 'subscription filter decode json failed: {}'.format(err)) + + callback_smo(sub, data) + + +def callback_smo(sub: Subscription, msg: Message2SMO): + sub_data = sub.serialize() + callback_data = json.dumps({ + 'consumerSubscriptionId': sub_data['consumerSubscriptionId'], + 'notificationEventType': msg.notificationEventType, + 'objectRef': msg.objectRef, + 'updateTime': msg.updatetime + }) + logger.info('URL: {}, data: {}'.format( + sub_data['callback'], callback_data)) + # r.publish(sub_data['subscriptionId'], json.dumps({ + # 'consumerSubscriptionId': sub_data['consumerSubscriptionId'], + # 'notificationEventType': msg.notificationEventType, + # 'objectRef': msg.objectRef + # })) + # try: + # headers = {'User-Agent': 'Mozilla/5.0'} + # resp = requests.post(sub_data['callback'], data=callback_data, + # headers=headers) + # if resp.status_code == 202 or resp.status_code == 200: + # logger.info('Notify to SMO successed') + # return + # logger.error('Response code is: {}'.format(resp.status_code)) + # except requests.exceptions.HTTPError as err: + # logger.error('request smo error: {}'.format(err)) + o = urlparse(sub_data['callback']) + conn = http.client.HTTPConnection(o.netloc) + headers = {'Content-type': 'application/json'} + conn.request('POST', o.path, callback_data, headers) + resp = conn.getresponse() + data = resp.read().decode('utf-8') + # json_data = json.loads(data) + if resp.status == 202 or resp.status == 200: + logger.info('Notify to SMO successed, response code {} {}, data {}'. + format(resp.status, resp.reason, data)) + return + logger.error('Response code is: {}'.format(resp.status)) diff --git a/o2ims/service/event/ocloud_event.py b/o2ims/service/event/ocloud_event.py new file mode 100644 index 0000000..dbeb448 --- /dev/null +++ b/o2ims/service/event/ocloud_event.py @@ -0,0 +1,30 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Callable + +from o2ims.domain import events + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +def notify_ocloud_update( + event: events.OcloudChanged, + publish: Callable, +): + logger.info('In notify_ocloud_update') + publish("OcloudChanged", event) + logger.debug("published Ocloud Changed: {}".format( + event.id)) diff --git a/o2ims/service/event/resource_event.py b/o2ims/service/event/resource_event.py new file mode 100644 index 0000000..2a749b7 --- /dev/null +++ b/o2ims/service/event/resource_event.py @@ -0,0 +1,30 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Callable + +from o2ims.domain import events + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +def notify_resource_change( + event: events.ResourceChanged, + publish: Callable, +): + logger.info('In notify_resource_change') + publish("ResourceChanged", event) + logger.debug("published Resource Changed: {}".format( + event.id)) diff --git a/o2ims/service/event/resource_pool_event.py b/o2ims/service/event/resource_pool_event.py new file mode 100644 index 0000000..9c78d5b --- /dev/null +++ b/o2ims/service/event/resource_pool_event.py @@ -0,0 +1,30 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Callable + +from o2ims.domain import events + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +def notify_resourcepool_change( + event: events.ResourcePoolChanged, + publish: Callable, +): + logger.info('In notify_resourcepool_change') + publish("ResourcePoolChanged", event) + logger.debug("published Resource Pool Changed: {}".format( + event.id)) diff --git a/o2ims/views/ocloud_view.py b/o2ims/views/ocloud_view.py index f970517..3735298 100644 --- a/o2ims/views/ocloud_view.py +++ b/o2ims/views/ocloud_view.py @@ -16,7 +16,7 @@ import uuid from o2common.service import unit_of_work from o2ims.views.ocloud_dto import SubscriptionDTO -from o2ims.domain.ocloud import Subscription +from o2ims.domain.subscription_obj import Subscription def oclouds(uow: unit_of_work.AbstractUnitOfWork): @@ -41,7 +41,6 @@ def resource_type_one(resourceTypeId: str, uow: unit_of_work.AbstractUnitOfWork): with uow: first = uow.resource_types.get(resourceTypeId) - print(first) return first.serialize() if first is not None else None diff --git a/requirements-test.txt b/requirements-test.txt index 3e9d882..9b6cbe9 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -5,6 +5,7 @@ requests tox pytest +pytest-cov pytest-icdiff mock diff --git a/tests/integration/test_ocloud_repository.py b/tests/integration/test_ocloud_repository.py index 6a67447..df91d29 100644 --- a/tests/integration/test_ocloud_repository.py +++ b/tests/integration/test_ocloud_repository.py @@ -17,7 +17,7 @@ import pytest from o2ims.domain import resource_type as rt from o2ims.adapter import ocloud_repository as repository -from o2ims.domain import ocloud +from o2ims.domain import ocloud, subscription_obj from o2common.config import config pytestmark = pytest.mark.usefixtures("mappers") @@ -158,7 +158,7 @@ def test_add_subscription(sqlite_session_factory): session = sqlite_session_factory() repo = repository.SubscriptionSqlAlchemyRepository(session) subscription_id1 = str(uuid.uuid4()) - subscription1 = ocloud.Subscription( + subscription1 = subscription_obj.Subscription( subscription_id1, "https://callback/uri/write/here") repo.add(subscription1) assert repo.get(subscription_id1) == subscription1 diff --git a/tests/integration/test_ocloud_view.py b/tests/integration/test_ocloud_view.py index d44bedf..510bf40 100644 --- a/tests/integration/test_ocloud_view.py +++ b/tests/integration/test_ocloud_view.py @@ -17,7 +17,7 @@ import pytest from o2common.config import config from o2ims.views import ocloud_view -from o2ims.domain import ocloud +from o2ims.domain import ocloud, subscription_obj from o2ims.domain import resource_type as rt @@ -200,7 +200,7 @@ def test_view_deployment_manager_one(sqlite_uow): def test_view_subscriptions(sqlite_uow): subscription_id1 = str(uuid.uuid4()) - subscription1 = ocloud.Subscription( + subscription1 = subscription_obj.Subscription( subscription_id1, "https://callback/uri/write/here") with sqlite_uow as uow: uow.subscriptions.add(subscription1) @@ -214,7 +214,7 @@ def test_view_subscriptions(sqlite_uow): def test_view_subscription_one(sqlite_uow): subscription_id1 = str(uuid.uuid4()) - subscription1 = ocloud.Subscription( + subscription1 = subscription_obj.Subscription( subscription_id1, "https://callback/uri/write/here") # Query return None @@ -235,7 +235,7 @@ def test_view_subscription_one(sqlite_uow): def test_view_subscription_delete(sqlite_uow): subscription_id1 = str(uuid.uuid4()) - subscription1 = ocloud.Subscription( + subscription1 = subscription_obj.Subscription( subscription_id1, "https://callback/uri/write/here") with sqlite_uow as uow: diff --git a/tests/mock_smo/subscription.py b/tests/mock_smo/subscription.py new file mode 100644 index 0000000..d2ab3c7 --- /dev/null +++ b/tests/mock_smo/subscription.py @@ -0,0 +1,110 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import redis +import json +import signal +import http.client +# from o2common.config import config + +# from o2common.helper import o2logging +# logger = o2logging.get_logger(__name__) + +# r = redis.Redis(**config.get_redis_host_and_port()) +r = redis.Redis(host='127.0.0.1', port=63791) + +# apibase = config.get_o2ims_api_base() +# url = config.get_api_url() +apibase = '/o2ims_infrastructureInventory/v1' +url = '127.0.0.1:5005' + + +class Singleton(type): + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super( + Singleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +class Subscription(metaclass=Singleton): + + def __init__(self, sub_id='') -> None: + self.url = url + self.subId = sub_id + + def subscription_ims(self): + conn = http.client.HTTPConnection(self.url) + headers = {'Content-type': 'application/json'} + post_val = { + 'callback': self.url, + 'consumerSubscriptionId': 'mock_smo', + 'filter': '["pserver","pserver_ram"]' + } + json_val = json.dumps(post_val) + conn.request('POST', apibase+'/subscriptions', json_val, headers) + resp = conn.getresponse() + data = resp.read().decode('utf-8') + print(resp.status, resp.reason) + print(data) + json_data = json.loads(data) + self.subId = json_data['subscriptionId'] + + def subscription_mq(self): + sub = r.pubsub(ignore_subscribe_messages=True) + sub.subscribe(self.subId) + + for m in sub.listen(): + try: + # logger.info("handling %s", m) + print("handling %s", m) + channel = m['channel'].decode("UTF-8") + if channel == self.subId: + datastr = m['data'] + data = json.loads(datastr) + # logger.info('notification: {}'.format(data)) + print('notification: {}'.format(data)) + else: + # logger.info("unhandled:{}".format(channel)) + print("unhandled:{}".format(channel)) + except Exception as ex: + # logger.warning("{}".format(str(ex))) + print("[WARNING]{}".format(str(ex))) + continue + + def unsubscription_ims(self): + conn = http.client.HTTPConnection(self.url) + conn.request('DELETE', apibase + '/subscriptions/' + self.subId) + resp = conn.getresponse() + print(resp.status, resp.reason) + + +def handler(signum, frame): + print('\nCtrl-c was pressed. Call to delete subscription') + sub = Subscription() + sub.unsubscription_ims() + exit() + + +def main(): + sub = Subscription() + sub.subscription_ims() + signal.signal(signal.SIGINT, handler) + sub.subscription_mq() + + +if __name__ == "__main__": + main() diff --git a/tests/unit/test_ocloud.py b/tests/unit/test_ocloud.py index c92cb22..13fc436 100644 --- a/tests/unit/test_ocloud.py +++ b/tests/unit/test_ocloud.py @@ -15,7 +15,7 @@ import uuid from unittest.mock import MagicMock -from o2ims.domain import ocloud +from o2ims.domain import ocloud, subscription_obj from o2ims.domain import resource_type as rt from o2ims.views import ocloud_view from o2common.config import config @@ -88,7 +88,7 @@ def test_new_deployment_manager(): def test_new_subscription(): subscription_id1 = str(uuid.uuid4()) - subscription1 = ocloud.Subscription( + subscription1 = subscription_obj.Subscription( subscription_id1, "https://callback/uri/write/here") assert subscription_id1 is not None and\ subscription1.subscriptionId == subscription_id1 diff --git a/tests/unit/test_watcher.py b/tests/unit/test_watcher.py index c4b134d..806d20f 100644 --- a/tests/unit/test_watcher.py +++ b/tests/unit/test_watcher.py @@ -15,7 +15,7 @@ import time from datetime import datetime import json -from typing import List +from typing import Callable, List # from o2common.config import config import uuid from o2common.service.watcher.base import BaseWatcher, WatcherTree @@ -129,7 +129,8 @@ class FakeUnitOfWork(AbstractUnitOfWork): def create_fake_bus(uow): def update_ocloud( cmd: commands.UpdateOCloud, - uow: AbstractUnitOfWork): + uow: AbstractUnitOfWork, + publish: Callable): return fakeuow = FakeUnitOfWork()