*.pyc
.tox
*.log
+temp
\ No newline at end of file
FROM python:3.10-slim-buster\r
\r
RUN apt-get update; apt-get install -y git gcc\r
+\r
+# in case git repo is not accessable\r
+# RUN mkdir -p /cgtsclient\r
+# COPY temp/config /cgtsclient/\r
+RUN git clone https://opendev.org/starlingx/config.git /cgtsclient\r
+RUN pip install -e /cgtsclient/sysinv/cgts-client/cgts-client/\r
+\r
+# RUN mkdir -p /distcloud-client\r
+# COPY temp/distcloud-client /distcloud-client/\r
+RUN git clone https://opendev.org/starlingx/distcloud-client.git /distcloud-client/\r
+RUN pip install -e /distcloud-client/distributedcloud-client\r
+# in case git repo is not accessable\r
+\r
+\r
COPY requirements.txt /tmp/\r
+COPY requirements-stx.txt /tmp/\r
COPY constraints.txt /tmp/\r
\r
-RUN pip install -r /tmp/requirements.txt -c /tmp/constraints.txt\r
+RUN pip install -r /tmp/requirements.txt -c /tmp/constraints.txt\r
+\r
+# RUN pip install -r /tmp/requirements-stx.txt\r
\r
COPY requirements-test.txt /tmp/\r
RUN pip install -r /tmp/requirements-test.txt\r
\r
+\r
RUN mkdir -p /src\r
COPY o2ims/ /src/o2ims/\r
COPY o2dms/ /src/o2dms/\r
\r
COPY tests/ /tests/\r
\r
+RUN apt-get install -y procps vim\r
+\r
WORKDIR /src\r
--- /dev/null
+FROM python:3.10-slim-buster\r
+\r
+RUN apt-get update; apt-get install -y git gcc\r
+\r
+# in case git repo is not accessable\r
+RUN mkdir -p /cgtsclient\r
+COPY temp/config /cgtsclient/\r
+RUN pip install -e cgtsclient/sysinv/cgts-client/cgts-client/\r
+\r
+RUN mkdir -p /distcloud-client\r
+COPY temp/distcloud-client /distcloud-client/\r
+RUN pip install -e /distcloud-client/distributedcloud-client\r
+# in case git repo is not accessable\r
+\r
+\r
+COPY requirements.txt /tmp/\r
+COPY constraints.txt /tmp/\r
+\r
+RUN pip install -r /tmp/requirements.txt -c /tmp/constraints.txt\r
+\r
+COPY requirements-test.txt /tmp/\r
+RUN pip install -r /tmp/requirements-test.txt\r
+\r
+\r
+RUN mkdir -p /src\r
+COPY o2ims/ /src/o2ims/\r
+COPY o2dms/ /src/o2dms/\r
+COPY o2common/ /src/o2common/\r
+COPY setup.py /src/\r
+\r
+# RUN pip install -e /src\r
+\r
+COPY tests/ /tests/\r
+\r
+RUN apt-get install -y procps vim\r
+\r
+WORKDIR /src\r
## Building containers\r
\r
+To accommodate the git repo access issue, the cgts-client and distributed client are\r
+cloned into temp before docker building\r
+\r
+```sh\r
+mkdir -p temp\r
+cd temp\r
+git clone https://opendev.org/starlingx/config.git\r
+git clone https://opendev.org/starlingx/distcloud-client.git\r
+cd -\r
+```\r
\r
```sh\r
docker-compose build\r
redis_pubsub:
build:
context: .
- dockerfile: Dockerfile
+ dockerfile: Dockerfile.localtest
image: o2imsdms-image
depends_on:
- postgres
ports:
- "5005:80"
+ watcher:
+ build:
+ context: .
+ dockerfile: Dockerfile.localtest
+ image: o2imsdms-image
+ depends_on:
+ - postgres
+ - redis
+ environment:
+ - DB_HOST=postgres
+ - DB_PASSWORD=o2ims123
+ - REDIS_HOST=redis
+ - PYTHONDONTWRITEBYTECODE=1
+ - OS_AUTH_URL=${OS_AUTH_URL}
+ - OS_USERNAME=${OS_USERNAME}
+ - OS_PASSWORD=${OS_PASSWORD}
+ volumes:
+ - ./o2ims:/o2ims
+ - ./o2dms:/o2dms
+ - ./o2common:/o2common
+ - ./tests:/tests
+ entrypoint:
+ - /bin/sh
+ - /o2ims/entrypoints/o2ims-watcher-entry.sh
+
postgres:
image: postgres:9.6
environment:
# Optional, Set\r
from o2ims.domain import stx_object as ocloudModel\r
from o2ims import config\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
\r
# from dcmanagerclient.api import client\r
from cgtsclient.client import get_client\r
super().__init__()\r
self.stxclient = stx_client if stx_client else self.getStxClient()\r
\r
- def getStxClient():\r
+ def getStxClient(self):\r
os_client_args = config.get_stx_access_info()\r
config_client = get_client(**os_client_args)\r
return config_client\r
def getInstanceInfo(self) -> ocloudModel.StxGenericModel:\r
systems = self.stxclient.isystem.list()\r
logger.debug("systems:" + str(systems[0].to_dict()))\r
- return ocloudModel.StxGenericModel(systems[0]) if systems else None\r
+ return ocloudModel.StxGenericModel(\r
+ ResourceTypeEnum.OCLOUD, systems[0]) if systems else None\r
\r
def getPserverList(self) -> List[ocloudModel.StxGenericModel]:\r
hosts = self.stxclient.ihost.list()\r
logger.debug("host 1:" + str(hosts[0].to_dict()))\r
- return [ocloudModel.StxGenericModel(self._hostconverter(host))\r
+ return [ocloudModel.StxGenericModel(\r
+ ResourceTypeEnum.PSERVER, self._hostconverter(host))\r
for host in hosts if host]\r
\r
def getPserver(self, id) -> ocloudModel.StxGenericModel:\r
host = self.stxclient.ihost.get(id)\r
logger.debug("host:" + str(host.to_dict()))\r
- return ocloudModel.StxGenericModel(self._hostconverter(host))\r
+ return ocloudModel.StxGenericModel(\r
+ ResourceTypeEnum.PSERVER, self._hostconverter(host))\r
\r
def getK8sList(self) -> List[ocloudModel.StxGenericModel]:\r
k8sclusters = self.stxclient.kube_cluster.list()\r
- logger.debug("k8sresources:" + str(k8sclusters[0].to_dict()))\r
-\r
- return [ocloudModel.StxGenericModel(self._k8sconverter(k8sres))\r
- for k8sres in k8sclusters if k8sres]\r
+ logger.debug("k8sresources[0]:" + str(k8sclusters[0].to_dict()))\r
+ return [ocloudModel.StxGenericModel(\r
+ ResourceTypeEnum.DMS,\r
+ self._k8sconverter(k8sres), self._k8shasher(k8sres))\r
+ for k8sres in k8sclusters if k8sres]\r
\r
def getK8sDetail(self, name) -> ocloudModel.StxGenericModel:\r
- k8scluster = self.stxclient.kube_cluster.get(name)\r
+ if not name:\r
+ k8sclusters = self.stxclient.kube_cluster.list()\r
+ # logger.debug("k8sresources[0]:" + str(k8sclusters[0].to_dict()))\r
+ k8scluster = k8sclusters.pop()\r
+ else:\r
+ k8scluster = self.stxclient.kube_cluster.get(name)\r
+\r
+ if not k8scluster:\r
+ return None\r
logger.debug("k8sresource:" + str(k8scluster.to_dict()))\r
- return ocloudModel.StxGenericModel(self._k8sconverter(k8scluster))\r
+ return ocloudModel.StxGenericModel(\r
+ ResourceTypeEnum.DMS,\r
+ self._k8sconverter(k8scluster), self._k8shasher(k8scluster))\r
\r
def getCpuList(self, hostid) -> List[ocloudModel.StxGenericModel]:\r
cpulist = self.stxclient.icpu.list(hostid)\r
- return [ocloudModel.StxGenericModel(self._cpuconverter(cpures))\r
- for cpures in cpulist if cpures]\r
+ return [ocloudModel.StxGenericModel(\r
+ ResourceTypeEnum.OCLOUD,\r
+ self._cpuconverter(cpures)) for cpures in cpulist if cpures]\r
\r
def getCpu(self, id) -> ocloudModel.StxGenericModel:\r
cpuinfo = self.stxclient.icpu.get(id)\r
- return ocloudModel.StxGenericModel(self._cpuconverter(cpuinfo))\r
+ return ocloudModel.StxGenericModel(\r
+ ResourceTypeEnum.OCLOUD, self._cpuconverter(cpuinfo))\r
\r
def _getIsystems(self):\r
return self.stxclient.isystem.list()\r
return cpu\r
\r
@staticmethod\r
- def _k8sconverter(host):\r
- setattr(host, "name", host.cluster_name)\r
- setattr(host, "uuid",\r
- uuid.uuid3(uuid.NAMESPACE_URL, host.cluster_name))\r
- setattr(host, 'updated_at', None)\r
- setattr(host, 'created_at', None)\r
+ def _k8sconverter(cluster):\r
+ setattr(cluster, "name", cluster.cluster_name)\r
+ setattr(cluster, "uuid",\r
+ uuid.uuid3(uuid.NAMESPACE_URL, cluster.cluster_name))\r
+ setattr(cluster, 'updated_at', None)\r
+ setattr(cluster, 'created_at', None)\r
logger.debug("k8s cluster name/uuid:" +\r
- host.name + "/" + str(host.uuid))\r
- return host\r
+ cluster.name + "/" + str(cluster.uuid))\r
+ return cluster\r
+\r
+ @staticmethod\r
+ def _k8shasher(cluster):\r
+ return str(hash((cluster.cluster_name,\r
+ cluster.cluster_api_endpoint, cluster.admin_user)))\r
\r
from sqlalchemy import (\r
Table,\r
- # MetaData,\r
+ MetaData,\r
Column,\r
# Integer,\r
String,\r
# Date,\r
DateTime,\r
+ # engine,\r
# ForeignKey,\r
# event,\r
+ Enum\r
)\r
\r
from sqlalchemy.orm import mapper\r
+# from sqlalchemy.sql.sqltypes import Integer\r
# from sqlalchemy.sql.expression import true\r
\r
from o2ims.domain import stx_object as ocloudModel\r
-from o2ims.adapter.orm import metadata\r
+# from o2ims.adapter.orm import metadata\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+from o2ims.adapter.unit_of_work import SqlAlchemyUnitOfWork\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
\r
logger = logging.getLogger(__name__)\r
\r
-# metadata = MetaData()\r
+metadata = MetaData()\r
\r
stxobject = Table(\r
"stxcache",\r
metadata,\r
Column("id", String(255), primary_key=True),\r
+ Column("type", Enum(ResourceTypeEnum)),\r
Column("name", String(255)),\r
Column("updatetime", DateTime),\r
Column("createtime", DateTime),\r
- Column("content", String(255))\r
+ Column("hash", String(255)),\r
+ Column("content", String)\r
)\r
\r
\r
-def start_o2ims_stx_mappers():\r
+def start_o2ims_stx_mappers(uow: AbstractUnitOfWork = SqlAlchemyUnitOfWork()):\r
logger.info("Starting O2 IMS Stx mappers")\r
mapper(ocloudModel.StxGenericModel, stxobject)\r
+\r
+ with uow:\r
+ engine1 = uow.session.get_bind()\r
+ metadata.create_all(engine1)\r
+ uow.commit()\r
# See the License for the specific language governing permissions and
# limitations under the License.
-import abc
-from typing import List, Set
+from typing import List
# from o2ims.adapter import orm
from o2ims.domain import ocloud
-
-
-class OcloudRepository(abc.ABC):
- def __init__(self):
- self.seen = set() # type: Set[ocloud.Ocloud]
-
- def add(self, ocloud: ocloud.Ocloud):
- self._add(ocloud)
- self.seen.add(ocloud)
-
- def get(self, ocloudid) -> ocloud.Ocloud:
- ocloud = self._get(ocloudid)
- if ocloud:
- self.seen.add(ocloud)
- return ocloud
-
- def list(self) -> List[ocloud.Ocloud]:
- return self._list()
-
- def update(self, ocloud: ocloud.Ocloud):
- self._update(ocloud)
-
- # def update_fields(self, ocloudid: str, updatefields: dict):
- # self._update(ocloudid, updatefields)
-
- @abc.abstractmethod
- def _add(self, ocloud: ocloud.Ocloud):
- raise NotImplementedError
-
- @abc.abstractmethod
- def _get(self, ocloudid) -> ocloud.Ocloud:
- raise NotImplementedError
-
- @abc.abstractmethod
- def _update(self, ocloud: ocloud.Ocloud):
- raise NotImplementedError
+from o2ims.domain.ocloud_repo import OcloudRepository
class OcloudSqlAlchemyRepository(OcloudRepository):
oCloudId=ocloudid).first()
def _list(self) -> List[ocloud.Ocloud]:
- return self.session.query()
+ return self.session.query(ocloud.Ocloud).order_by(
+ ocloud.Ocloud.name).all()
def _update(self, ocloud: ocloud.Ocloud):
self.session.add(ocloud)
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List
+# from o2ims.adapter import orm
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.stx_repo import StxObjectRepository
+from o2ims.domain.resource_type import ResourceTypeEnum
+
+
+class StxObjectSqlAlchemyRepository(StxObjectRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, stx_obj: StxGenericModel):
+ self.session.add(stx_obj)
+
+ def _get(self, stx_obj_id) -> StxGenericModel:
+ return self.session.query(StxGenericModel).filter_by(
+ id=stx_obj_id).first()
+
+ def _list(self, type: ResourceTypeEnum) -> List[StxGenericModel]:
+ return self.session.query(StxGenericModel).filter_by(
+ type=type).order_by(StxGenericModel.updatetime.desc()).all()
+
+ def _update(self, stx_obj: StxGenericModel):
+ self.session.add(stx_obj)
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+# pylint: disable=attribute-defined-outside-init\r
+from __future__ import annotations\r
+from sqlalchemy import create_engine\r
+from sqlalchemy.orm import sessionmaker\r
+from sqlalchemy.orm.session import Session\r
+\r
+from o2ims import config\r
+from o2ims.adapter.ocloud_repository import OcloudSqlAlchemyRepository\r
+from o2ims.adapter.stx_repository import StxObjectSqlAlchemyRepository\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+\r
+\r
+DEFAULT_SESSION_FACTORY = sessionmaker(\r
+ bind=create_engine(\r
+ config.get_postgres_uri(),\r
+ isolation_level="REPEATABLE READ",\r
+ )\r
+)\r
+\r
+\r
+class SqlAlchemyUnitOfWork(AbstractUnitOfWork):\r
+ def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):\r
+ self.session_factory = session_factory\r
+\r
+ def __enter__(self):\r
+ self.session = self.session_factory() # type: Session\r
+ self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
+ self.stxobjects = StxObjectSqlAlchemyRepository(self.session)\r
+ return super().__enter__()\r
+\r
+ def __exit__(self, *args):\r
+ super().__exit__(*args)\r
+ self.session.close()\r
+\r
+ def _commit(self):\r
+ self.session.commit()\r
+\r
+ def rollback(self):\r
+ self.session.rollback()\r
SmoO2Notifications
from o2ims.service import handlers, messagebus, unit_of_work
+from o2ims.adapter.unit_of_work import SqlAlchemyUnitOfWork
+from o2ims.adapter.clients import orm_stx
def bootstrap(
start_orm: bool = True,
- uow: unit_of_work.AbstractUnitOfWork = unit_of_work.SqlAlchemyUnitOfWork(),
+ uow: unit_of_work.AbstractUnitOfWork = SqlAlchemyUnitOfWork(),
notifications: AbstractNotifications = None,
publish: Callable = redis_eventpublisher.publish,
) -> messagebus.MessageBus:
if start_orm:
orm.start_o2ims_mappers()
+ orm_stx.start_o2ims_stx_mappers(uow)
dependencies = {"uow": uow, "notifications": notifications,
"publish": publish}
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+import abc\r
+from typing import List, Set\r
+from o2ims.domain import ocloud\r
+\r
+\r
+class OcloudRepository(abc.ABC):\r
+ def __init__(self):\r
+ self.seen = set() # type: Set[ocloud.Ocloud]\r
+\r
+ def add(self, ocloud: ocloud.Ocloud):\r
+ self._add(ocloud)\r
+ self.seen.add(ocloud)\r
+\r
+ def get(self, ocloudid) -> ocloud.Ocloud:\r
+ ocloud = self._get(ocloudid)\r
+ if ocloud:\r
+ self.seen.add(ocloud)\r
+ return ocloud\r
+\r
+ def list(self) -> List[ocloud.Ocloud]:\r
+ return self._list()\r
+\r
+ def update(self, ocloud: ocloud.Ocloud):\r
+ self._update(ocloud)\r
+\r
+ # def update_fields(self, ocloudid: str, updatefields: dict):\r
+ # self._update(ocloudid, updatefields)\r
+\r
+ @abc.abstractmethod\r
+ def _add(self, ocloud: ocloud.Ocloud):\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _get(self, ocloudid) -> ocloud.Ocloud:\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _update(self, ocloud: ocloud.Ocloud):\r
+ raise NotImplementedError\r
\r
\r
class ResourceTypeEnum(Enum):\r
- PSERVER = 1\r
- PSERVER_CPU = 2\r
- PSERVER_RAM = 3\r
+ OCLOUD = 1,\r
+ RESOURCE_POOL = 2,\r
+ DMS = 3,\r
+ PSERVER = 11\r
+ PSERVER_CPU = 12\r
+ PSERVER_RAM = 13\r
# limitations under the License.\r
\r
# from dataclasses import dataclass\r
-# import datetime\r
+import datetime\r
import json\r
\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
+import logging\r
+logger = logging.getLogger(__name__)\r
+\r
\r
class MismatchedModel(Exception):\r
pass\r
\r
\r
class StxGenericModel:\r
- def __init__(self, api_response: dict = None) -> None:\r
+ def __init__(self, type: ResourceTypeEnum,\r
+ api_response: dict = None, content_hash=None) -> None:\r
if api_response:\r
self.id = api_response.uuid\r
- self.content = json.dumps(api_response.to_dict())\r
- self.updatetime = api_response.updated_at\r
- self.createtime = api_response.created_at\r
+ self.type = type\r
+ self.updatetime = datetime.datetime.strptime(\r
+ api_response.updated_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \\r
+ if api_response.updated_at else None\r
+ self.createtime = datetime.datetime.strptime(\r
+ api_response.created_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \\r
+ if api_response.created_at else None\r
self.name = api_response.name\r
+ self.hash = content_hash if content_hash \\r
+ else str(hash((self.id, self.updatetime)))\r
+ self.content = json.dumps(api_response.to_dict())\r
\r
def is_outdated(self, newmodel) -> bool:\r
- return self.updatetime < newmodel.updatetime\r
+ # return self.updatetime < newmodel.updatetime\r
+ # logger.warning("hash1: " + self.hash + " vs hash2: " + newmodel.hash)\r
+ return self.hash != newmodel.hash\r
\r
def update_by(self, newmodel) -> None:\r
if self.id != newmodel.id:\r
raise MismatchedModel("Mismatched model")\r
self.name = newmodel.name\r
-\r
- self.content = newmodel.content\r
self.createtime = newmodel.createtime\r
self.updatetime = newmodel.updatetime\r
-\r
-\r
-class StxK8sClusterModel(StxGenericModel):\r
- def __init__(self, api_response: dict = None) -> None:\r
- super().__init__(api_response=api_response)\r
-\r
- def is_outdated(self, newmodel) -> bool:\r
- # never outdated since lack of such evidence\r
- return False\r
+ self.content = newmodel.content\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+import abc\r
+from typing import List, Set\r
+from o2ims.domain.stx_object import StxGenericModel\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
+\r
+\r
+class StxObjectRepository(abc.ABC):\r
+ def __init__(self):\r
+ self.seen = set() # type: Set[StxGenericModel]\r
+\r
+ def add(self, stx_obj: StxGenericModel):\r
+ self._add(stx_obj)\r
+ self.seen.add(stx_obj)\r
+\r
+ def get(self, stx_obj_id) -> StxGenericModel:\r
+ stx_obj = self._get(stx_obj_id)\r
+ if stx_obj:\r
+ self.seen.add(stx_obj)\r
+ return stx_obj\r
+\r
+ def list(self, type: ResourceTypeEnum) -> List[StxGenericModel]:\r
+ return self._list(type)\r
+\r
+ def update(self, stx_obj: StxGenericModel):\r
+ self._update(stx_obj)\r
+\r
+ # def update_fields(self, stx_obj_id: str, updatefields: dict):\r
+ # self._update(stx_obj_id, updatefields)\r
+\r
+ @abc.abstractmethod\r
+ def _add(self, stx_obj: StxGenericModel):\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _get(self, stx_obj_id) -> StxGenericModel:\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _update(self, stx_obj: StxGenericModel):\r
+ raise NotImplementedError\r
+\r
+ @abc.abstractmethod\r
+ def _list(self, type: ResourceTypeEnum):\r
+ raise NotImplementedError\r
# from o2ims.service.handlers import InvalidResourceType\r
from o2ims import bootstrap, config\r
from o2ims.views import ocloud_view\r
-from o2ims.service.watcher.executor import start_watchers\r
\r
app = Flask(__name__)\r
bus = bootstrap.bootstrap()\r
apibase = config.get_o2ims_api_base()\r
-start_watchers()\r
\r
\r
@app.route(apibase, methods=["GET"])\r
--- /dev/null
+#!/bin/sh
+
+# pip install -e /src
+# python /o2ims/entrypoints/resource_watcher.py
+
+cp -r /o2ims/* /src/o2ims
+pip install -e /src
+python /o2ims/entrypoints/resource_watcher.py
import cotyledon\r
\r
from o2ims.service.watcher.worker import PollWorker\r
-from o2ims.service.watcher.base import OcloudWather\r
+from o2ims.service.watcher.base import OcloudWatcher\r
from o2ims.service.watcher.base import DmsWatcher\r
+# from o2ims.service.client.base_client import BaseClient\r
+from o2ims.adapter.clients.ocloud_sa_client import StxSaDmsClient\r
+from o2ims.adapter.clients.ocloud_sa_client import StxSaOcloudClient\r
+\r
+from o2ims import bootstrap\r
+# from o2ims import config\r
+# import redis\r
\r
import logging\r
logger = logging.getLogger(__name__)\r
\r
+# r = redis.Redis(**config.get_redis_host_and_port())\r
+\r
\r
class WatcherService(cotyledon.Service):\r
- def __init__(self, worker_id, args) -> None:\r
+ def __init__(self, worker_id, args=None) -> None:\r
super().__init__(worker_id)\r
self.args = args\r
+ self.bus = bootstrap.bootstrap()\r
self.worker = PollWorker()\r
+ # self.stxrepo = self.bus.uow.stxobjects\r
+ # tbd: 1 client per resource pool\r
+ # self.client = StxSaOcloudClient()\r
\r
def run(self):\r
try:\r
- self.worker.add_watcher(OcloudWather())\r
- self.worker.add_watcher(DmsWatcher())\r
+ self.worker.add_watcher(OcloudWatcher(StxSaOcloudClient(),\r
+ self.bus.uow))\r
+ self.worker.add_watcher(DmsWatcher(StxSaDmsClient(),\r
+ self.bus.uow))\r
self.worker.start()\r
except Exception as ex:\r
- logger.warning(ex.message)\r
+ logger.warning("WorkerService Exception:" + str(ex))\r
finally:\r
self.worker.stop()\r
\r
\r
-def start_watchers(sm=None):\r
+def start_watchers(sm: cotyledon.ServiceManager = None):\r
watchersm = sm if sm else cotyledon.ServiceManager()\r
watchersm.add(WatcherService, workers=1, args=())\r
- return watchersm\r
+ watchersm.run()\r
+\r
+\r
+def main():\r
+ logger.info("Resource watcher starting")\r
+ start_watchers()\r
+\r
+\r
+if __name__ == "__main__":\r
+ main()\r
# pylint: disable=attribute-defined-outside-init
from __future__ import annotations
import abc
-from sqlalchemy import create_engine
-from sqlalchemy.orm import sessionmaker
-from sqlalchemy.orm.session import Session
-from o2ims import config
-from o2ims.adapter import ocloud_repository
+from o2ims.domain.ocloud_repo import OcloudRepository
+from o2ims.domain.stx_repo import StxObjectRepository
class AbstractUnitOfWork(abc.ABC):
- oclouds: ocloud_repository.OcloudRepository
+ oclouds: OcloudRepository
+ stxobjects: StxObjectRepository
def __enter__(self):
return self
@abc.abstractmethod
def rollback(self):
raise NotImplementedError
-
-
-DEFAULT_SESSION_FACTORY = sessionmaker(
- bind=create_engine(
- config.get_postgres_uri(),
- isolation_level="REPEATABLE READ",
- )
-)
-
-
-class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
- def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):
- self.session_factory = session_factory
-
- def __enter__(self):
- self.session = self.session_factory() # type: Session
- self.oclouds = ocloud_repository\
- .OcloudSqlAlchemyRepository(self.session)
- return super().__enter__()
-
- def __exit__(self, *args):
- super().__exit__(*args)
- self.session.close()
-
- def _commit(self):
- self.session.commit()
-
- def rollback(self):
- self.session.rollback()
# See the License for the specific language governing permissions and\r
# limitations under the License.\r
\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
from o2ims.service.client.base_client import BaseClient\r
from o2ims.domain.stx_object import StxGenericModel\r
-from o2ims.adapter.ocloud_repository import OcloudRepository\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+\r
+import logging\r
+logger = logging.getLogger(__name__)\r
\r
\r
class InvalidOcloudState(Exception):\r
raise NotImplementedError\r
\r
\r
-class OcloudWather(BaseWatcher):\r
+class OcloudWatcher(BaseWatcher):\r
def __init__(self, ocloud_client: BaseClient,\r
- repo: OcloudRepository) -> None:\r
+ uow: AbstractUnitOfWork) -> None:\r
super().__init__(ocloud_client)\r
- self._repo = repo\r
+ self._uow = uow\r
\r
def _targetname(self):\r
return "ocloud"\r
self._compare_and_update(ocloudmodel)\r
\r
def _compare_and_update(self, ocloudmodel: StxGenericModel) -> bool:\r
- # localmodel = self._repo.get(ocloudmodel.id)\r
- oclouds = self._repo.list()\r
- if len(oclouds) > 1:\r
- raise InvalidOcloudState("More than 1 ocloud is found")\r
- if len(oclouds) == 0:\r
- self._repo.add(ocloudmodel)\r
- else:\r
- localmodel = oclouds.pop()\r
- if localmodel.is_outdated(ocloudmodel):\r
- localmodel.update_by(ocloudmodel)\r
- self._repo.update(localmodel)\r
+ with self._uow:\r
+ # localmodel = self._uow.stxobjects.get(str(ocloudmodel.id))\r
+ oclouds = self._uow.stxobjects.list(ResourceTypeEnum.OCLOUD)\r
+ if len(oclouds) > 1:\r
+ raise InvalidOcloudState("More than 1 ocloud is found")\r
+ if len(oclouds) == 0:\r
+ logger.info("add ocloud:" + ocloudmodel.name\r
+ + " update_at: " + str(ocloudmodel.updatetime)\r
+ + " id: " + str(ocloudmodel.id)\r
+ + " hash: " + str(ocloudmodel.hash))\r
+ self._uow.stxobjects.add(ocloudmodel)\r
+ else:\r
+ localmodel = oclouds.pop()\r
+ if localmodel.is_outdated(ocloudmodel):\r
+ logger.info("update ocloud:" + ocloudmodel.name\r
+ + " update_at: " + str(ocloudmodel.updatetime)\r
+ + " id: " + str(ocloudmodel.id)\r
+ + " hash: " + str(ocloudmodel.hash))\r
+ localmodel.update_by(ocloudmodel)\r
+ self._uow.stxobjects.update(localmodel)\r
+ self._uow.commit()\r
\r
\r
class DmsWatcher(BaseWatcher):\r
- def __init__(self, client: BaseClient) -> None:\r
+ def __init__(self, client: BaseClient,\r
+ uow: AbstractUnitOfWork) -> None:\r
super().__init__(client)\r
+ self._uow = uow\r
\r
def _targetname(self):\r
return "dms"\r
\r
+ def _probe(self):\r
+ ocloudmodel = self._client.get(None)\r
+ if ocloudmodel:\r
+ self._compare_and_update(ocloudmodel)\r
+\r
+ def _compare_and_update(self, newmodel: StxGenericModel) -> bool:\r
+ with self._uow:\r
+ # localmodel = self._uow.stxobjects.get(ocloudmodel.id)\r
+ localmodel = self._uow.stxobjects.get(str(newmodel.id))\r
+ if not localmodel:\r
+ logger.info("add dms:" + newmodel.name)\r
+ self._uow.stxobjects.add(newmodel)\r
+ elif localmodel.is_outdated(newmodel):\r
+ logger.info("update dms:" + newmodel.name)\r
+ localmodel.update_by(newmodel)\r
+ self._uow.stxobjects.update(newmodel)\r
+ self._uow.commit()\r
+\r
\r
class ResourcePoolWatcher(BaseWatcher):\r
- def __init__(self) -> None:\r
+ def __init__(self, client: BaseClient,\r
+ uow: AbstractUnitOfWork) -> None:\r
super().__init__()\r
+ self._uow = uow\r
\r
def _targetname(self):\r
- return "ocloud"\r
+ return "resourcepool"\r
+\r
+ def _probe(self):\r
+ ocloudmodel = self._client.get(None)\r
+ if ocloudmodel:\r
+ logger.info("detect ocloudmodel:" + ocloudmodel.name)\r
+ self._compare_and_update(ocloudmodel)\r
+\r
+ def _compare_and_update(self, newmodel: StxGenericModel) -> bool:\r
+ with self._uow:\r
+ # localmodel = self._uow.stxobjects.get(ocloudmodel.id)\r
+ localmodel = self._uow.stxobjects.get(str(newmodel.id))\r
+ if not localmodel:\r
+ self._uow.stxobjects.add(newmodel)\r
+ elif localmodel.is_outdated(newmodel):\r
+ localmodel.update_by(newmodel)\r
+ self._uow.stxobjects.update(newmodel)\r
+ self._uow.commit()\r
\r
\r
class ResourceWatcher(BaseWatcher):\r
- def __init__(self) -> None:\r
+ def __init__(self, client: BaseClient,\r
+ uow: AbstractUnitOfWork) -> None:\r
super().__init__()\r
+ self._uow = uow\r
\r
def _targetname(self):\r
return "resource"\r
+\r
+ def _probe(self):\r
+ ocloudmodel = self._client.get(None)\r
+ if ocloudmodel:\r
+ self._compare_and_update(ocloudmodel)\r
+\r
+ def _compare_and_update(self, newmodel: StxGenericModel) -> bool:\r
+ with self._uow:\r
+ # localmodel = self._repo.get(ocloudmodel.id)\r
+ localmodel = self._uow.stxobjects.get(str(newmodel.id))\r
+ if not localmodel:\r
+ self._uow.stxobjects.add(newmodel)\r
+ elif localmodel.is_outdated(newmodel):\r
+ localmodel.update_by(newmodel)\r
+ self._uow.stxobjects.update(newmodel)\r
+ self._uow.commit()\r
logger.debug("about to probe:"+w)\r
self.watchers[w].probe()\r
except Exception as ex:\r
- logger.warning(ex.message)\r
+ logger.warning("Worker:" + w + " raises exception:" + str(ex))\r
continue\r
self.schedinstance.enter(self.schedinterval, 1, self._repeat)\r
\r
\r
def stop(self):\r
self._stopped = True\r
-\r
-\r
-defaultworker = PollWorker()\r
from o2ims.service import unit_of_work\r
\r
\r
-def ocloud_one(ocloudid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+def ocloud_one(ocloudid: str, uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
results = uow.session.execute(\r
"""\r
return dict(results[0]) if len(results) > 0 else None\r
\r
\r
-def oclouds(uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+def oclouds(uow: unit_of_work.AbstractUnitOfWork):\r
with uow:\r
results = uow.session.execute(\r
"""\r
--- /dev/null
+-e git+https://opendev.org/starlingx/distcloud-client.git@master#egg=distributedcloud-client&subdirectory=distributedcloud-client\r
+-e git+https://opendev.org/starlingx/config.git@master#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client#\r
\r
Cython>=3.0a1\r
\r
+\r
+httplib2\r
+babel\r
+PrettyTable<0.8,>=0.7.2\r
# -e git+https://opendev.org/starlingx/distcloud-client.git@master#egg=distributedcloud-client&subdirectory=distributedcloud-client\r
# -e git+https://opendev.org/starlingx/config.git@master#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client#\r
--- /dev/null
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+from multiprocessing.queues import Queue\r
+import pytest\r
+from o2ims.entrypoints.resource_watcher import start_watchers\r
+from multiprocessing import Process\r
+from multiprocessing import Pipe\r
+# pipe = Pipe()\r
+# q = Queue()\r
+import time\r
+# pytestmark = pytest.mark.usefixtures("mappers")\r
+\r
+\r
+def test_watcher_service():\r
+ testedprocess = Process(target=start_watchers, args=())\r
+ testedprocess.start()\r
+ time.sleep(10)\r
+ testedprocess.terminate()\r
import json\r
from o2ims.adapter.clients.ocloud_sa_client import StxSaOcloudClient\r
from o2ims.domain import stx_object as ocloudModel\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
\r
# pytestmark = pytest.mark.usefixtures("mappers")\r
\r
super().__init__()\r
\r
def getInstanceInfo(self) -> ocloudModel.StxGenericModel:\r
- model = ocloudModel.StxGenericModel()\r
+ model = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
model.id = uuid.uuid4()\r
model.name = "stx1"\r
model.updatetime = datetime.now\r
# See the License for the specific language governing permissions and
# limitations under the License.
-import pytest
from o2ims.domain import ocloud
from o2ims import config
import uuid
from datetime import datetime\r
import json\r
from typing import List\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
from o2ims.service.client.base_client import BaseClient\r
-import pytest\r
from o2ims.domain import ocloud\r
from o2ims import config\r
import uuid\r
-from o2ims.service.watcher.base import BaseWatcher, OcloudWather\r
+from o2ims.service.watcher.base import BaseWatcher, OcloudWatcher\r
from o2ims.domain import stx_object as ocloudModel\r
from o2ims.adapter.ocloud_repository import OcloudRepository\r
+from o2ims.domain.stx_repo import StxObjectRepository\r
from o2ims.service.watcher import worker\r
-from o2ims.service.watcher.executor import start_watchers\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+\r
\r
class FakeOcloudClient(BaseClient):\r
def __init__(self):\r
super().__init__()\r
- fakeCloud = ocloudModel.StxGenericModel()\r
+ fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
fakeCloud.id = uuid.uuid4()\r
fakeCloud.name = 'stx1'\r
fakeCloud.content = json.dumps({})\r
fakeCloud.createtime = datetime.now()\r
- fakeCloud.updatetime = datetime.now\r
+ fakeCloud.updatetime = datetime.now()\r
+ fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))\r
self.fakeCloud = fakeCloud\r
\r
def _get(self, id) -> ocloudModel.StxGenericModel:\r
def _list(self):\r
return [self.fakeCloud]\r
\r
+\r
class FakeOcloudRepo(OcloudRepository):\r
def __init__(self):\r
super().__init__()\r
ocloud1 = filtered.pop()\r
ocloud1.update_by(ocloud)\r
\r
+\r
+\r
+class FakeStxObjRepo(StxObjectRepository):\r
+ def __init__(self):\r
+ super().__init__()\r
+ self.oclouds = []\r
+\r
+ def _add(self, ocloud: ocloud.Ocloud):\r
+ self.oclouds.append(ocloud)\r
+\r
+ def _get(self, ocloudid) -> ocloud.Ocloud:\r
+ filtered = [o for o in self.oclouds if o.id == ocloudid]\r
+ return filtered.pop()\r
+\r
+ def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:\r
+ return [x for x in self.oclouds]\r
+\r
+ def _update(self, ocloud: ocloud.Ocloud):\r
+ filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
+ assert len(filtered) == 1\r
+ ocloud1 = filtered.pop()\r
+ ocloud1.update_by(ocloud)\r
+\r
+\r
+class FakeUnitOfWork(AbstractUnitOfWork):\r
+ def __init__(self):\r
+ pass\r
+\r
+ def __enter__(self):\r
+ # self.session = self.session_factory() # type: Session\r
+ # self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
+ self.stxobjects = FakeStxObjRepo()\r
+ return super().__enter__()\r
+\r
+ def __exit__(self, *args):\r
+ super().__exit__(*args)\r
+ # self.session.close()\r
+\r
+ def _commit(self):\r
+ pass\r
+ # self.session.commit()\r
+\r
+ def rollback(self):\r
+ pass\r
+ # self.session.rollback()\r
+\r
+\r
def test_probe_new_ocloud():\r
- fakeRepo = FakeOcloudRepo()\r
+ # fakeRepo = FakeOcloudRepo()\r
+ fakeuow = FakeUnitOfWork()\r
fakeClient = FakeOcloudClient()\r
- ocloudwatcher = OcloudWather(fakeClient, fakeRepo)\r
+ ocloudwatcher = OcloudWatcher(fakeClient, fakeuow)\r
ocloudwatcher.probe()\r
- assert len(fakeRepo.oclouds) == 1\r
- assert fakeRepo.oclouds[0].name == "stx1"\r
+ assert len(fakeuow.stxobjects.oclouds) == 1\r
+ assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
+\r
\r
-def test_default_worker():\r
+def test_watchers_worker():\r
+ testedworker = worker.PollWorker()\r
\r
class FakeOCloudWatcher(BaseWatcher):\r
def __init__(self, client: BaseClient,\r
\r
def _targetname(self):\r
return "fakeocloudwatcher"\r
- \r
+\r
def _probe(self):\r
self.fakeOcloudWatcherCounter += 1\r
# hacking to stop the blocking sched task\r
if self.fakeOcloudWatcherCounter > 2:\r
- worker.defaultworker.stop()\r
+ testedworker.stop()\r
+\r
\r
+ # fakeRepo = FakeOcloudRepo()\r
+ fakeuow = FakeUnitOfWork()\r
\r
- fakeRepo = FakeOcloudRepo()\r
fakeClient = FakeOcloudClient()\r
- fakewatcher = FakeOCloudWatcher(fakeClient, fakeRepo)\r
+ fakewatcher = FakeOCloudWatcher(fakeClient, fakeuow)\r
\r
- worker.defaultworker.set_interval(1)\r
- worker.defaultworker.add_watcher(fakewatcher)\r
+ testedworker.set_interval(1)\r
+ testedworker.add_watcher(fakewatcher)\r
assert fakewatcher.fakeOcloudWatcherCounter == 0\r
\r
count1 = fakewatcher.fakeOcloudWatcherCounter\r
- worker.defaultworker.start()\r
+ testedworker.start()\r
time.sleep(20)\r
assert fakewatcher.fakeOcloudWatcherCounter > count1\r
\r