- postgres
- redis
environment:
+ - API_HOST=api
- DB_HOST=postgres
- DB_PASSWORD=o2ims123
- REDIS_HOST=redis
return '', 202\r
\r
\r
+@app.route('/registration', methods=['POST'])\r
+def registration():\r
+ logger.info('Registration data: {}'.format(request.get_data()))\r
+ return '', 200\r
+\r
+\r
def subscription_ims(url, consumerSubscriptionId):\r
sub_key = r.get(REDIS_SUB_KEY)\r
logger.info('Subscription key is {}'.format(sub_key))\r
for entry in self.deployment_managers.seen:\r
while hasattr(entry, 'events') and len(entry.events) > 0:\r
yield entry.events.pop(0)\r
+ for entry in self.subscriptions.seen:\r
+ while hasattr(entry, 'events') and len(entry.events) > 0:\r
+ yield entry.events.pop(0)\r
+ for entry in self.registrations.seen:\r
+ while hasattr(entry, 'events') and len(entry.events) > 0:\r
+ yield entry.events.pop(0)\r
for entry in self.nfdeployment_descs.seen:\r
while hasattr(entry, 'events') and len(entry.events) > 0:\r
yield entry.events.pop(0)\r
from o2ims.domain import commands as imscmd
from o2common.helper import o2logging
-from o2ims.domain.subscription_obj import Message2SMO
+from o2ims.domain.subscription_obj import Message2SMO, NotificationEventEnum, RegistrationMessage
logger = o2logging.get_logger(__name__)
r = redis.Redis(**config.get_redis_host_and_port())
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe("NfDeploymentStateChanged")
pubsub.subscribe('ResourceChanged')
+ pubsub.subscribe('RegistrationChanged')
+ pubsub.subscribe('OcloudChanged')
for m in pubsub.listen():
try:
ToState=data['ToState']
)
bus.handle(cmd)
- if channel == 'ResourceChanged':
+ elif channel == 'ResourceChanged':
datastr = m['data']
data = json.loads(datastr)
logger.info('ResourceChanged with cmd:{}'.format(data))
eventtype=data['notificationEventType'],
updatetime=data['updatetime']))
bus.handle(cmd)
+ elif channel == 'RegistrationChanged':
+ datastr = m['data']
+ data = json.loads(datastr)
+ logger.info('RegistrationChanged with cmd:{}'.format(data))
+ cmd = imscmd.Register2SMO(data=RegistrationMessage(id=data['id']))
+ bus.handle(cmd)
+ elif channel == 'OcloudChanged':
+ datastr = m['data']
+ data = json.loads(datastr)
+ logger.info('OcloudChanged with cmd:{}'.format(data))
+ if data['notificationEventType'] == NotificationEventEnum.CREATE:
+ cmd = imscmd.Register2SMO(data=RegistrationMessage(is_all=True))
+ bus.handle(cmd)
else:
logger.info("unhandled:{}".format(channel))
resourcepool_handler, pserver_handler, pserver_cpu_handler, \
pserver_mem_handler, pserver_port_handler, pserver_if_handler,\
pserver_eth_handler
-from o2ims.service.command import notify_handler
-from o2ims.service.event import ocloud_event, \
- resource_event, resource_pool_event
+from o2ims.service.command import notify_handler, registration_handler
+from o2ims.service.event import ocloud_event, resource_event, \
+ resource_pool_event, registration_event
# if TYPE_CHECKING:
# from . import unit_of_work
events.ResourceChanged: [resource_event.notify_resource_change],
events.ResourcePoolChanged: [resource_pool_event.\
notify_resourcepool_change],
+ events.RegistrationChanged: [registration_event.\
+ notify_registration_change],
} # type: Dict[Type[events.Event], Callable]
o2dms_cmmands.DeleteNfDeployment:
nfdeployment_handler.delete_nfdeployment,
commands.PubMessage2SMO: notify_handler.notify_change_to_smo,
+ commands.Register2SMO: registration_handler.registry_to_smo,
} # type: Dict[Type[commands.Command], Callable]
from typing import List\r
from sqlalchemy.inspection import inspect\r
from sqlalchemy.exc import NoInspectionAvailable\r
+# from sqlalchemy.orm.exc import DetachedInstanceError\r
from .events import Event\r
\r
\r
# pylint: disable=too-few-public-methods
# from dataclasses import dataclass
+# from datetime import datetime
+
class Event:
pass
# Date,\r
DateTime,\r
ForeignKey,\r
- Boolean,\r
+ # Boolean,\r
# engine,\r
# event,\r
)\r
\r
Column("registrationId", String(255), primary_key=True),\r
Column("callback", String(255)),\r
- Column("notified", Boolean),\r
+ Column("status", String(255)),\r
+ Column("comments", String(255)),\r
)\r
\r
\r
# from datetime import datetime
# from o2ims.domain.resource_type import ResourceTypeEnum
from o2ims.domain.stx_object import StxGenericModel
-from o2ims.domain.subscription_obj import Message2SMO
+from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
from o2common.domain.commands import Command
data: Message2SMO
+@dataclass
+class Register2SMO(Command):
+ data: RegistrationMessage
+
+
@dataclass
class UpdateOCloud(UpdateStxObject):
pass
# pylint: disable=too-few-public-methods
from dataclasses import dataclass
from datetime import datetime
+
from o2common.domain.events import Event
from o2ims.domain.subscription_obj import NotificationEventEnum
resourcePoolId: str
notificationEventType: NotificationEventEnum
updatetime: datetime.now()
+
+
+@dataclass
+class RegistrationChanged(Event):
+ id: str
+ updatetime: datetime.now()
self.filter = filter
-class Registration(AgRoot, Serializer):
- def __init__(self, id: str, url: str) -> None:
- super().__init__()
- self.registrationId = id
- self.callback = url
- self.notified = False
-
-
class NotificationEventEnum(str, Enum):
CREATE = 'CREATE'
MODIFY = 'MODIFY'
self.updatetime = updatetime
+class RegistrationStatusEnum(str, Enum):
+ CREATED = 'CREATED'
+ NOTIFIED = 'NOTIFIED'
+ FAILED = 'FAILED'
+
+
+class Registration(AgRoot, Serializer):
+ def __init__(self, id: str, url: str,
+ status: RegistrationStatusEnum =
+ RegistrationStatusEnum.CREATED,
+ comments: str = '') -> None:
+ super().__init__()
+ self.registrationId = id
+ self.callback = url
+ self.status = status
+ self.comments = comments
+
+
+class RegistrationMessage(Serializer):
+ def __init__(self, is_all: bool = None, id: str = '') -> None:
+ self.all = is_all if is_all is not None else False
+ self.id = id
+
+
@dataclass
class EventState():
Initial = 0
# pylint: disable=unused-argument
from __future__ import annotations
-from typing import Callable
+# from typing import Callable
# from dataclasses import asdict
# from typing import List, Dict, Callable, Type
def update_ocloud(
cmd: commands.UpdateOCloud,
- uow: AbstractUnitOfWork,
- publish: Callable
+ uow: AbstractUnitOfWork
):
stxobj = cmd.data
with uow:
# limitations under the License.\r
\r
import json\r
-import redis\r
+# import redis\r
# import requests\r
import http.client\r
from urllib.parse import urlparse\r
\r
-from o2common.config import config\r
+# from o2common.config import config\r
from o2common.service.unit_of_work import AbstractUnitOfWork\r
from o2ims.domain import commands\r
from o2ims.domain.subscription_obj import Subscription, Message2SMO\r
from o2common.helper import o2logging\r
logger = o2logging.get_logger(__name__)\r
\r
-# Maybe another MQ server\r
-r = redis.Redis(**config.get_redis_host_and_port())\r
+# # Maybe another MQ server\r
+# r = redis.Redis(**config.get_redis_host_and_port())\r
\r
\r
def notify_change_to_smo(\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+# import time\r
+import json\r
+# import asyncio\r
+# import requests\r
+import http.client\r
+from urllib.parse import urlparse\r
+from retry import retry\r
+\r
+from o2common.service.unit_of_work import AbstractUnitOfWork\r
+from o2common.config import config\r
+from o2ims.domain import commands\r
+from o2ims.domain.subscription_obj import RegistrationStatusEnum\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+\r
+def registry_to_smo(\r
+ cmd: commands.Register2SMO,\r
+ uow: AbstractUnitOfWork,\r
+):\r
+ logger.info('In registry_to_smo')\r
+ data = cmd.data\r
+ logger.info('The Register2SMO all is {}'.format(data.all))\r
+ if data.all:\r
+ regs = uow.registrations.list()\r
+ for reg in regs:\r
+ reg_data = reg.serialize()\r
+ logger.debug('Registration: {}'.format(reg_data['registrationId']))\r
+\r
+ register_smo(uow, reg_data)\r
+ else:\r
+ with uow:\r
+ reg = uow.registrations.get(data.id)\r
+ if reg is None:\r
+ return\r
+ logger.debug('Registration: {}'.format(reg.registrationId))\r
+ reg_data = reg.serialize()\r
+ register_smo(uow, reg_data)\r
+\r
+\r
+def register_smo(uow, reg_data):\r
+ call_res = call_smo(reg_data)\r
+ logger.debug('Call SMO response is {}'.format(call_res))\r
+ if call_res:\r
+ reg = uow.registrations.get(reg_data['registrationId'])\r
+ if reg is None:\r
+ return\r
+ reg.status = RegistrationStatusEnum.NOTIFIED\r
+ logger.debug('Updating Registration: {}'.format(\r
+ reg.registrationId))\r
+ uow.registrations.update(reg)\r
+ uow.commit()\r
+\r
+\r
+# def retry(fun, max_tries=2):\r
+# for i in range(max_tries):\r
+# try:\r
+# time.sleep(5*i)\r
+# # await asyncio.sleep(5*i)\r
+# res = fun()\r
+# logger.debug('retry function result: {}'.format(res))\r
+# return res\r
+# except Exception:\r
+# continue\r
+\r
+\r
+@retry((ConnectionRefusedError), tries=2, delay=2)\r
+def call_smo(reg_data: dict):\r
+ callback_data = json.dumps({\r
+ 'consumerSubscriptionId': reg_data['registrationId'],\r
+ 'imsUrl': config.get_api_url()\r
+ })\r
+ logger.info('URL: {}, data: {}'.format(\r
+ reg_data['callback'], callback_data))\r
+\r
+ o = urlparse(reg_data['callback'])\r
+ conn = http.client.HTTPConnection(o.netloc)\r
+ headers = {'Content-type': 'application/json'}\r
+ conn.request('POST', o.path, callback_data, headers)\r
+ resp = conn.getresponse()\r
+ data = resp.read().decode('utf-8')\r
+ # json_data = json.loads(data)\r
+ if resp.status == 202 or resp.status == 200:\r
+ logger.info('Registrer to SMO successed, response code {} {}, data {}'.\r
+ format(resp.status, resp.reason, data))\r
+ return True\r
+ logger.error('Response code is: {}'.format(resp.status))\r
+ return False\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+from typing import Callable\r
+\r
+from o2ims.domain import events\r
+\r
+from o2common.helper import o2logging\r
+logger = o2logging.get_logger(__name__)\r
+\r
+\r
+def notify_registration_change(\r
+ event: events.RegistrationChanged,\r
+ publish: Callable,\r
+):\r
+ logger.info('In notify_registration_change')\r
+ publish("RegistrationChanged", event)\r
+ logger.debug("published Registration Changed: {}".format(\r
+ event.id))\r
@api_ims_inventory_v1.marshal_with(post_resp, code=201)
def post(self):
data = api_ims_inventory_v1.payload
- result = ocloud_view.registration_create(data, bus.uow)
+ result = ocloud_view.registration_create(data, bus)
return result, 201
# See the License for the specific language governing permissions and\r
# limitations under the License.\r
\r
+import logging\r
import uuid\r
+from datetime import datetime\r
\r
-from o2common.service import unit_of_work\r
+from o2common.service import unit_of_work, messagebus\r
+from o2ims.domain import events\r
from o2ims.views.ocloud_dto import RegistrationDTO, SubscriptionDTO\r
from o2ims.domain.subscription_obj import Registration, Subscription\r
\r
\r
\r
def registration_create(registrationDto: RegistrationDTO.registration,\r
- uow: unit_of_work.AbstractUnitOfWork):\r
+ bus: messagebus.MessageBus):\r
\r
reg_uuid = str(uuid.uuid4())\r
registration = Registration(\r
reg_uuid, registrationDto['callback'])\r
- with uow:\r
+ with bus.uow as uow:\r
uow.registrations.add(registration)\r
+ logging.debug('before event length {}'.format(\r
+ len(registration.events)))\r
+ registration.events.append(events.RegistrationChanged(\r
+ reg_uuid,\r
+ datetime.now()))\r
+ logging.debug('after event length {}'.format(len(registration.events)))\r
uow.commit()\r
+ _handle_events(bus)\r
return {"registrationId": reg_uuid}\r
\r
\r
uow.registrations.delete(registrationId)\r
uow.commit()\r
return True\r
+\r
+\r
+def _handle_events(bus: messagebus.MessageBus):\r
+ # handle events\r
+ events = bus.uow.collect_new_events()\r
+ for event in events:\r
+ bus.handle(event)\r
+ return True\r