From: Bin Yang Date: Sat, 6 Nov 2021 01:38:59 +0000 (+0800) Subject: Update watcher worker X-Git-Tag: 1.0.0~35 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=8be81dfad35b08c4de77168e885bb18253069771;p=pti%2Fo2.git Update watcher worker Issue-ID: INF-196 Signed-off-by: Bin Yang Change-Id: Ifcfea06b3f6263018e1d43d595f8b2f1545a54fe --- diff --git a/.gitignore b/.gitignore index c46d404..91ffe40 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ __pycache__ *.pyc .tox *.log +temp \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 593e21f..75e2363 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,14 +1,32 @@ FROM python:3.10-slim-buster RUN apt-get update; apt-get install -y git gcc + +# in case git repo is not accessable +# RUN mkdir -p /cgtsclient +# COPY temp/config /cgtsclient/ +RUN git clone https://opendev.org/starlingx/config.git /cgtsclient +RUN pip install -e /cgtsclient/sysinv/cgts-client/cgts-client/ + +# RUN mkdir -p /distcloud-client +# COPY temp/distcloud-client /distcloud-client/ +RUN git clone https://opendev.org/starlingx/distcloud-client.git /distcloud-client/ +RUN pip install -e /distcloud-client/distributedcloud-client +# in case git repo is not accessable + + COPY requirements.txt /tmp/ +COPY requirements-stx.txt /tmp/ COPY constraints.txt /tmp/ -RUN pip install -r /tmp/requirements.txt -c /tmp/constraints.txt +RUN pip install -r /tmp/requirements.txt -c /tmp/constraints.txt + +# RUN pip install -r /tmp/requirements-stx.txt COPY requirements-test.txt /tmp/ RUN pip install -r /tmp/requirements-test.txt + RUN mkdir -p /src COPY o2ims/ /src/o2ims/ COPY o2dms/ /src/o2dms/ @@ -19,4 +37,6 @@ RUN pip install -e /src COPY tests/ /tests/ +RUN apt-get install -y procps vim + WORKDIR /src diff --git a/Dockerfile.localtest b/Dockerfile.localtest new file mode 100644 index 0000000..c61c1db --- /dev/null +++ b/Dockerfile.localtest @@ -0,0 +1,37 @@ +FROM python:3.10-slim-buster + +RUN apt-get update; apt-get install -y git gcc + +# in case git repo is not accessable +RUN mkdir -p /cgtsclient +COPY temp/config /cgtsclient/ +RUN pip install -e cgtsclient/sysinv/cgts-client/cgts-client/ + +RUN mkdir -p /distcloud-client +COPY temp/distcloud-client /distcloud-client/ +RUN pip install -e /distcloud-client/distributedcloud-client +# in case git repo is not accessable + + +COPY requirements.txt /tmp/ +COPY constraints.txt /tmp/ + +RUN pip install -r /tmp/requirements.txt -c /tmp/constraints.txt + +COPY requirements-test.txt /tmp/ +RUN pip install -r /tmp/requirements-test.txt + + +RUN mkdir -p /src +COPY o2ims/ /src/o2ims/ +COPY o2dms/ /src/o2dms/ +COPY o2common/ /src/o2common/ +COPY setup.py /src/ + +# RUN pip install -e /src + +COPY tests/ /tests/ + +RUN apt-get install -y procps vim + +WORKDIR /src diff --git a/README.md b/README.md index fdc1d39..1612e8e 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,15 @@ ## Building containers +To accommodate the git repo access issue, the cgts-client and distributed client are +cloned into temp before docker building + +```sh +mkdir -p temp +cd temp +git clone https://opendev.org/starlingx/config.git +git clone https://opendev.org/starlingx/distcloud-client.git +cd - +``` ```sh docker-compose build diff --git a/docker-compose.yml b/docker-compose.yml index 4ffc2f1..c7ca8b7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ services: redis_pubsub: build: context: . - dockerfile: Dockerfile + dockerfile: Dockerfile.localtest image: o2imsdms-image depends_on: - postgres @@ -56,6 +56,31 @@ services: ports: - "5005:80" + watcher: + build: + context: . + dockerfile: Dockerfile.localtest + image: o2imsdms-image + depends_on: + - postgres + - redis + environment: + - DB_HOST=postgres + - DB_PASSWORD=o2ims123 + - REDIS_HOST=redis + - PYTHONDONTWRITEBYTECODE=1 + - OS_AUTH_URL=${OS_AUTH_URL} + - OS_USERNAME=${OS_USERNAME} + - OS_PASSWORD=${OS_PASSWORD} + volumes: + - ./o2ims:/o2ims + - ./o2dms:/o2dms + - ./o2common:/o2common + - ./tests:/tests + entrypoint: + - /bin/sh + - /o2ims/entrypoints/o2ims-watcher-entry.sh + postgres: image: postgres:9.6 environment: diff --git a/o2ims/adapter/clients/ocloud_sa_client.py b/o2ims/adapter/clients/ocloud_sa_client.py index 5e9e64c..e8c48a3 100644 --- a/o2ims/adapter/clients/ocloud_sa_client.py +++ b/o2ims/adapter/clients/ocloud_sa_client.py @@ -20,6 +20,7 @@ from typing import List # Optional, Set from o2ims.domain import stx_object as ocloudModel from o2ims import config +from o2ims.domain.resource_type import ResourceTypeEnum # from dcmanagerclient.api import client from cgtsclient.client import get_client @@ -101,7 +102,7 @@ class StxSaClientImp(object): super().__init__() self.stxclient = stx_client if stx_client else self.getStxClient() - def getStxClient(): + def getStxClient(self): os_client_args = config.get_stx_access_info() config_client = get_client(**os_client_args) return config_client @@ -109,39 +110,55 @@ class StxSaClientImp(object): def getInstanceInfo(self) -> ocloudModel.StxGenericModel: systems = self.stxclient.isystem.list() logger.debug("systems:" + str(systems[0].to_dict())) - return ocloudModel.StxGenericModel(systems[0]) if systems else None + return ocloudModel.StxGenericModel( + ResourceTypeEnum.OCLOUD, systems[0]) if systems else None def getPserverList(self) -> List[ocloudModel.StxGenericModel]: hosts = self.stxclient.ihost.list() logger.debug("host 1:" + str(hosts[0].to_dict())) - return [ocloudModel.StxGenericModel(self._hostconverter(host)) + return [ocloudModel.StxGenericModel( + ResourceTypeEnum.PSERVER, self._hostconverter(host)) for host in hosts if host] def getPserver(self, id) -> ocloudModel.StxGenericModel: host = self.stxclient.ihost.get(id) logger.debug("host:" + str(host.to_dict())) - return ocloudModel.StxGenericModel(self._hostconverter(host)) + return ocloudModel.StxGenericModel( + ResourceTypeEnum.PSERVER, self._hostconverter(host)) def getK8sList(self) -> List[ocloudModel.StxGenericModel]: k8sclusters = self.stxclient.kube_cluster.list() - logger.debug("k8sresources:" + str(k8sclusters[0].to_dict())) - - return [ocloudModel.StxGenericModel(self._k8sconverter(k8sres)) - for k8sres in k8sclusters if k8sres] + logger.debug("k8sresources[0]:" + str(k8sclusters[0].to_dict())) + return [ocloudModel.StxGenericModel( + ResourceTypeEnum.DMS, + self._k8sconverter(k8sres), self._k8shasher(k8sres)) + for k8sres in k8sclusters if k8sres] def getK8sDetail(self, name) -> ocloudModel.StxGenericModel: - k8scluster = self.stxclient.kube_cluster.get(name) + if not name: + k8sclusters = self.stxclient.kube_cluster.list() + # logger.debug("k8sresources[0]:" + str(k8sclusters[0].to_dict())) + k8scluster = k8sclusters.pop() + else: + k8scluster = self.stxclient.kube_cluster.get(name) + + if not k8scluster: + return None logger.debug("k8sresource:" + str(k8scluster.to_dict())) - return ocloudModel.StxGenericModel(self._k8sconverter(k8scluster)) + return ocloudModel.StxGenericModel( + ResourceTypeEnum.DMS, + self._k8sconverter(k8scluster), self._k8shasher(k8scluster)) def getCpuList(self, hostid) -> List[ocloudModel.StxGenericModel]: cpulist = self.stxclient.icpu.list(hostid) - return [ocloudModel.StxGenericModel(self._cpuconverter(cpures)) - for cpures in cpulist if cpures] + return [ocloudModel.StxGenericModel( + ResourceTypeEnum.OCLOUD, + self._cpuconverter(cpures)) for cpures in cpulist if cpures] def getCpu(self, id) -> ocloudModel.StxGenericModel: cpuinfo = self.stxclient.icpu.get(id) - return ocloudModel.StxGenericModel(self._cpuconverter(cpuinfo)) + return ocloudModel.StxGenericModel( + ResourceTypeEnum.OCLOUD, self._cpuconverter(cpuinfo)) def _getIsystems(self): return self.stxclient.isystem.list() @@ -167,12 +184,17 @@ class StxSaClientImp(object): return cpu @staticmethod - def _k8sconverter(host): - setattr(host, "name", host.cluster_name) - setattr(host, "uuid", - uuid.uuid3(uuid.NAMESPACE_URL, host.cluster_name)) - setattr(host, 'updated_at', None) - setattr(host, 'created_at', None) + def _k8sconverter(cluster): + setattr(cluster, "name", cluster.cluster_name) + setattr(cluster, "uuid", + uuid.uuid3(uuid.NAMESPACE_URL, cluster.cluster_name)) + setattr(cluster, 'updated_at', None) + setattr(cluster, 'created_at', None) logger.debug("k8s cluster name/uuid:" + - host.name + "/" + str(host.uuid)) - return host + cluster.name + "/" + str(cluster.uuid)) + return cluster + + @staticmethod + def _k8shasher(cluster): + return str(hash((cluster.cluster_name, + cluster.cluster_api_endpoint, cluster.admin_user))) diff --git a/o2ims/adapter/clients/orm_stx.py b/o2ims/adapter/clients/orm_stx.py index 4b825c4..b8c1523 100644 --- a/o2ims/adapter/clients/orm_stx.py +++ b/o2ims/adapter/clients/orm_stx.py @@ -17,37 +17,50 @@ import logging from sqlalchemy import ( Table, - # MetaData, + MetaData, Column, # Integer, String, # Date, DateTime, + # engine, # ForeignKey, # event, + Enum ) from sqlalchemy.orm import mapper +# from sqlalchemy.sql.sqltypes import Integer # from sqlalchemy.sql.expression import true from o2ims.domain import stx_object as ocloudModel -from o2ims.adapter.orm import metadata +# from o2ims.adapter.orm import metadata +from o2ims.service.unit_of_work import AbstractUnitOfWork +from o2ims.adapter.unit_of_work import SqlAlchemyUnitOfWork +from o2ims.domain.resource_type import ResourceTypeEnum logger = logging.getLogger(__name__) -# metadata = MetaData() +metadata = MetaData() stxobject = Table( "stxcache", metadata, Column("id", String(255), primary_key=True), + Column("type", Enum(ResourceTypeEnum)), Column("name", String(255)), Column("updatetime", DateTime), Column("createtime", DateTime), - Column("content", String(255)) + Column("hash", String(255)), + Column("content", String) ) -def start_o2ims_stx_mappers(): +def start_o2ims_stx_mappers(uow: AbstractUnitOfWork = SqlAlchemyUnitOfWork()): logger.info("Starting O2 IMS Stx mappers") mapper(ocloudModel.StxGenericModel, stxobject) + + with uow: + engine1 = uow.session.get_bind() + metadata.create_all(engine1) + uow.commit() diff --git a/o2ims/adapter/ocloud_repository.py b/o2ims/adapter/ocloud_repository.py index ca90209..32dced9 100644 --- a/o2ims/adapter/ocloud_repository.py +++ b/o2ims/adapter/ocloud_repository.py @@ -12,46 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import abc -from typing import List, Set +from typing import List # from o2ims.adapter import orm from o2ims.domain import ocloud - - -class OcloudRepository(abc.ABC): - def __init__(self): - self.seen = set() # type: Set[ocloud.Ocloud] - - def add(self, ocloud: ocloud.Ocloud): - self._add(ocloud) - self.seen.add(ocloud) - - def get(self, ocloudid) -> ocloud.Ocloud: - ocloud = self._get(ocloudid) - if ocloud: - self.seen.add(ocloud) - return ocloud - - def list(self) -> List[ocloud.Ocloud]: - return self._list() - - def update(self, ocloud: ocloud.Ocloud): - self._update(ocloud) - - # def update_fields(self, ocloudid: str, updatefields: dict): - # self._update(ocloudid, updatefields) - - @abc.abstractmethod - def _add(self, ocloud: ocloud.Ocloud): - raise NotImplementedError - - @abc.abstractmethod - def _get(self, ocloudid) -> ocloud.Ocloud: - raise NotImplementedError - - @abc.abstractmethod - def _update(self, ocloud: ocloud.Ocloud): - raise NotImplementedError +from o2ims.domain.ocloud_repo import OcloudRepository class OcloudSqlAlchemyRepository(OcloudRepository): @@ -68,7 +32,8 @@ class OcloudSqlAlchemyRepository(OcloudRepository): oCloudId=ocloudid).first() def _list(self) -> List[ocloud.Ocloud]: - return self.session.query() + return self.session.query(ocloud.Ocloud).order_by( + ocloud.Ocloud.name).all() def _update(self, ocloud: ocloud.Ocloud): self.session.add(ocloud) diff --git a/o2ims/adapter/stx_repository.py b/o2ims/adapter/stx_repository.py new file mode 100644 index 0000000..741aa11 --- /dev/null +++ b/o2ims/adapter/stx_repository.py @@ -0,0 +1,39 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List +# from o2ims.adapter import orm +from o2ims.domain.stx_object import StxGenericModel +from o2ims.domain.stx_repo import StxObjectRepository +from o2ims.domain.resource_type import ResourceTypeEnum + + +class StxObjectSqlAlchemyRepository(StxObjectRepository): + def __init__(self, session): + super().__init__() + self.session = session + + def _add(self, stx_obj: StxGenericModel): + self.session.add(stx_obj) + + def _get(self, stx_obj_id) -> StxGenericModel: + return self.session.query(StxGenericModel).filter_by( + id=stx_obj_id).first() + + def _list(self, type: ResourceTypeEnum) -> List[StxGenericModel]: + return self.session.query(StxGenericModel).filter_by( + type=type).order_by(StxGenericModel.updatetime.desc()).all() + + def _update(self, stx_obj: StxGenericModel): + self.session.add(stx_obj) diff --git a/o2ims/adapter/unit_of_work.py b/o2ims/adapter/unit_of_work.py new file mode 100644 index 0000000..c958ce2 --- /dev/null +++ b/o2ims/adapter/unit_of_work.py @@ -0,0 +1,53 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=attribute-defined-outside-init +from __future__ import annotations +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm.session import Session + +from o2ims import config +from o2ims.adapter.ocloud_repository import OcloudSqlAlchemyRepository +from o2ims.adapter.stx_repository import StxObjectSqlAlchemyRepository +from o2ims.service.unit_of_work import AbstractUnitOfWork + + +DEFAULT_SESSION_FACTORY = sessionmaker( + bind=create_engine( + config.get_postgres_uri(), + isolation_level="REPEATABLE READ", + ) +) + + +class SqlAlchemyUnitOfWork(AbstractUnitOfWork): + def __init__(self, session_factory=DEFAULT_SESSION_FACTORY): + self.session_factory = session_factory + + def __enter__(self): + self.session = self.session_factory() # type: Session + self.oclouds = OcloudSqlAlchemyRepository(self.session) + self.stxobjects = StxObjectSqlAlchemyRepository(self.session) + return super().__enter__() + + def __exit__(self, *args): + super().__exit__(*args) + self.session.close() + + def _commit(self): + self.session.commit() + + def rollback(self): + self.session.rollback() diff --git a/o2ims/bootstrap.py b/o2ims/bootstrap.py index 524f325..55fc99d 100644 --- a/o2ims/bootstrap.py +++ b/o2ims/bootstrap.py @@ -19,11 +19,13 @@ from o2ims.adapter.notifications import AbstractNotifications,\ SmoO2Notifications from o2ims.service import handlers, messagebus, unit_of_work +from o2ims.adapter.unit_of_work import SqlAlchemyUnitOfWork +from o2ims.adapter.clients import orm_stx def bootstrap( start_orm: bool = True, - uow: unit_of_work.AbstractUnitOfWork = unit_of_work.SqlAlchemyUnitOfWork(), + uow: unit_of_work.AbstractUnitOfWork = SqlAlchemyUnitOfWork(), notifications: AbstractNotifications = None, publish: Callable = redis_eventpublisher.publish, ) -> messagebus.MessageBus: @@ -33,6 +35,7 @@ def bootstrap( if start_orm: orm.start_o2ims_mappers() + orm_stx.start_o2ims_stx_mappers(uow) dependencies = {"uow": uow, "notifications": notifications, "publish": publish} diff --git a/o2ims/domain/ocloud_repo.py b/o2ims/domain/ocloud_repo.py new file mode 100644 index 0000000..2c486bd --- /dev/null +++ b/o2ims/domain/ocloud_repo.py @@ -0,0 +1,53 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from typing import List, Set +from o2ims.domain import ocloud + + +class OcloudRepository(abc.ABC): + def __init__(self): + self.seen = set() # type: Set[ocloud.Ocloud] + + def add(self, ocloud: ocloud.Ocloud): + self._add(ocloud) + self.seen.add(ocloud) + + def get(self, ocloudid) -> ocloud.Ocloud: + ocloud = self._get(ocloudid) + if ocloud: + self.seen.add(ocloud) + return ocloud + + def list(self) -> List[ocloud.Ocloud]: + return self._list() + + def update(self, ocloud: ocloud.Ocloud): + self._update(ocloud) + + # def update_fields(self, ocloudid: str, updatefields: dict): + # self._update(ocloudid, updatefields) + + @abc.abstractmethod + def _add(self, ocloud: ocloud.Ocloud): + raise NotImplementedError + + @abc.abstractmethod + def _get(self, ocloudid) -> ocloud.Ocloud: + raise NotImplementedError + + @abc.abstractmethod + def _update(self, ocloud: ocloud.Ocloud): + raise NotImplementedError diff --git a/o2ims/domain/resource_type.py b/o2ims/domain/resource_type.py index 72f0db0..d95eb18 100644 --- a/o2ims/domain/resource_type.py +++ b/o2ims/domain/resource_type.py @@ -2,6 +2,9 @@ from enum import Enum class ResourceTypeEnum(Enum): - PSERVER = 1 - PSERVER_CPU = 2 - PSERVER_RAM = 3 + OCLOUD = 1, + RESOURCE_POOL = 2, + DMS = 3, + PSERVER = 11 + PSERVER_CPU = 12 + PSERVER_RAM = 13 diff --git a/o2ims/domain/stx_object.py b/o2ims/domain/stx_object.py index 09d1a16..36ce349 100644 --- a/o2ims/domain/stx_object.py +++ b/o2ims/domain/stx_object.py @@ -13,40 +13,44 @@ # limitations under the License. # from dataclasses import dataclass -# import datetime +import datetime import json +from o2ims.domain.resource_type import ResourceTypeEnum +import logging +logger = logging.getLogger(__name__) + class MismatchedModel(Exception): pass class StxGenericModel: - def __init__(self, api_response: dict = None) -> None: + def __init__(self, type: ResourceTypeEnum, + api_response: dict = None, content_hash=None) -> None: if api_response: self.id = api_response.uuid - self.content = json.dumps(api_response.to_dict()) - self.updatetime = api_response.updated_at - self.createtime = api_response.created_at + self.type = type + self.updatetime = datetime.datetime.strptime( + api_response.updated_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \ + if api_response.updated_at else None + self.createtime = datetime.datetime.strptime( + api_response.created_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \ + if api_response.created_at else None self.name = api_response.name + self.hash = content_hash if content_hash \ + else str(hash((self.id, self.updatetime))) + self.content = json.dumps(api_response.to_dict()) def is_outdated(self, newmodel) -> bool: - return self.updatetime < newmodel.updatetime + # return self.updatetime < newmodel.updatetime + # logger.warning("hash1: " + self.hash + " vs hash2: " + newmodel.hash) + return self.hash != newmodel.hash def update_by(self, newmodel) -> None: if self.id != newmodel.id: raise MismatchedModel("Mismatched model") self.name = newmodel.name - - self.content = newmodel.content self.createtime = newmodel.createtime self.updatetime = newmodel.updatetime - - -class StxK8sClusterModel(StxGenericModel): - def __init__(self, api_response: dict = None) -> None: - super().__init__(api_response=api_response) - - def is_outdated(self, newmodel) -> bool: - # never outdated since lack of such evidence - return False + self.content = newmodel.content diff --git a/o2ims/domain/stx_repo.py b/o2ims/domain/stx_repo.py new file mode 100644 index 0000000..cb74480 --- /dev/null +++ b/o2ims/domain/stx_repo.py @@ -0,0 +1,58 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from typing import List, Set +from o2ims.domain.stx_object import StxGenericModel +from o2ims.domain.resource_type import ResourceTypeEnum + + +class StxObjectRepository(abc.ABC): + def __init__(self): + self.seen = set() # type: Set[StxGenericModel] + + def add(self, stx_obj: StxGenericModel): + self._add(stx_obj) + self.seen.add(stx_obj) + + def get(self, stx_obj_id) -> StxGenericModel: + stx_obj = self._get(stx_obj_id) + if stx_obj: + self.seen.add(stx_obj) + return stx_obj + + def list(self, type: ResourceTypeEnum) -> List[StxGenericModel]: + return self._list(type) + + def update(self, stx_obj: StxGenericModel): + self._update(stx_obj) + + # def update_fields(self, stx_obj_id: str, updatefields: dict): + # self._update(stx_obj_id, updatefields) + + @abc.abstractmethod + def _add(self, stx_obj: StxGenericModel): + raise NotImplementedError + + @abc.abstractmethod + def _get(self, stx_obj_id) -> StxGenericModel: + raise NotImplementedError + + @abc.abstractmethod + def _update(self, stx_obj: StxGenericModel): + raise NotImplementedError + + @abc.abstractmethod + def _list(self, type: ResourceTypeEnum): + raise NotImplementedError diff --git a/o2ims/entrypoints/flask_application.py b/o2ims/entrypoints/flask_application.py index bb791bc..8965a30 100644 --- a/o2ims/entrypoints/flask_application.py +++ b/o2ims/entrypoints/flask_application.py @@ -19,12 +19,10 @@ from flask import Flask, jsonify # from o2ims.service.handlers import InvalidResourceType from o2ims import bootstrap, config from o2ims.views import ocloud_view -from o2ims.service.watcher.executor import start_watchers app = Flask(__name__) bus = bootstrap.bootstrap() apibase = config.get_o2ims_api_base() -start_watchers() @app.route(apibase, methods=["GET"]) diff --git a/o2ims/entrypoints/o2ims-watcher-entry.sh b/o2ims/entrypoints/o2ims-watcher-entry.sh new file mode 100644 index 0000000..f942908 --- /dev/null +++ b/o2ims/entrypoints/o2ims-watcher-entry.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +# pip install -e /src +# python /o2ims/entrypoints/resource_watcher.py + +cp -r /o2ims/* /src/o2ims +pip install -e /src +python /o2ims/entrypoints/resource_watcher.py diff --git a/o2ims/service/watcher/executor.py b/o2ims/entrypoints/resource_watcher.py similarity index 50% rename from o2ims/service/watcher/executor.py rename to o2ims/entrypoints/resource_watcher.py index 607bdc9..a3292df 100644 --- a/o2ims/service/watcher/executor.py +++ b/o2ims/entrypoints/resource_watcher.py @@ -15,31 +15,55 @@ import cotyledon from o2ims.service.watcher.worker import PollWorker -from o2ims.service.watcher.base import OcloudWather +from o2ims.service.watcher.base import OcloudWatcher from o2ims.service.watcher.base import DmsWatcher +# from o2ims.service.client.base_client import BaseClient +from o2ims.adapter.clients.ocloud_sa_client import StxSaDmsClient +from o2ims.adapter.clients.ocloud_sa_client import StxSaOcloudClient + +from o2ims import bootstrap +# from o2ims import config +# import redis import logging logger = logging.getLogger(__name__) +# r = redis.Redis(**config.get_redis_host_and_port()) + class WatcherService(cotyledon.Service): - def __init__(self, worker_id, args) -> None: + def __init__(self, worker_id, args=None) -> None: super().__init__(worker_id) self.args = args + self.bus = bootstrap.bootstrap() self.worker = PollWorker() + # self.stxrepo = self.bus.uow.stxobjects + # tbd: 1 client per resource pool + # self.client = StxSaOcloudClient() def run(self): try: - self.worker.add_watcher(OcloudWather()) - self.worker.add_watcher(DmsWatcher()) + self.worker.add_watcher(OcloudWatcher(StxSaOcloudClient(), + self.bus.uow)) + self.worker.add_watcher(DmsWatcher(StxSaDmsClient(), + self.bus.uow)) self.worker.start() except Exception as ex: - logger.warning(ex.message) + logger.warning("WorkerService Exception:" + str(ex)) finally: self.worker.stop() -def start_watchers(sm=None): +def start_watchers(sm: cotyledon.ServiceManager = None): watchersm = sm if sm else cotyledon.ServiceManager() watchersm.add(WatcherService, workers=1, args=()) - return watchersm + watchersm.run() + + +def main(): + logger.info("Resource watcher starting") + start_watchers() + + +if __name__ == "__main__": + main() diff --git a/o2ims/service/unit_of_work.py b/o2ims/service/unit_of_work.py index 40e0f76..a06e9e0 100644 --- a/o2ims/service/unit_of_work.py +++ b/o2ims/service/unit_of_work.py @@ -15,16 +15,14 @@ # pylint: disable=attribute-defined-outside-init from __future__ import annotations import abc -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker -from sqlalchemy.orm.session import Session -from o2ims import config -from o2ims.adapter import ocloud_repository +from o2ims.domain.ocloud_repo import OcloudRepository +from o2ims.domain.stx_repo import StxObjectRepository class AbstractUnitOfWork(abc.ABC): - oclouds: ocloud_repository.OcloudRepository + oclouds: OcloudRepository + stxobjects: StxObjectRepository def __enter__(self): return self @@ -47,32 +45,3 @@ class AbstractUnitOfWork(abc.ABC): @abc.abstractmethod def rollback(self): raise NotImplementedError - - -DEFAULT_SESSION_FACTORY = sessionmaker( - bind=create_engine( - config.get_postgres_uri(), - isolation_level="REPEATABLE READ", - ) -) - - -class SqlAlchemyUnitOfWork(AbstractUnitOfWork): - def __init__(self, session_factory=DEFAULT_SESSION_FACTORY): - self.session_factory = session_factory - - def __enter__(self): - self.session = self.session_factory() # type: Session - self.oclouds = ocloud_repository\ - .OcloudSqlAlchemyRepository(self.session) - return super().__enter__() - - def __exit__(self, *args): - super().__exit__(*args) - self.session.close() - - def _commit(self): - self.session.commit() - - def rollback(self): - self.session.rollback() diff --git a/o2ims/service/watcher/base.py b/o2ims/service/watcher/base.py index 26c67da..c3ff3d4 100644 --- a/o2ims/service/watcher/base.py +++ b/o2ims/service/watcher/base.py @@ -12,9 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +from o2ims.domain.resource_type import ResourceTypeEnum from o2ims.service.client.base_client import BaseClient from o2ims.domain.stx_object import StxGenericModel -from o2ims.adapter.ocloud_repository import OcloudRepository +from o2ims.service.unit_of_work import AbstractUnitOfWork + +import logging +logger = logging.getLogger(__name__) class InvalidOcloudState(Exception): @@ -39,11 +43,11 @@ class BaseWatcher(object): raise NotImplementedError -class OcloudWather(BaseWatcher): +class OcloudWatcher(BaseWatcher): def __init__(self, ocloud_client: BaseClient, - repo: OcloudRepository) -> None: + uow: AbstractUnitOfWork) -> None: super().__init__(ocloud_client) - self._repo = repo + self._uow = uow def _targetname(self): return "ocloud" @@ -54,38 +58,105 @@ class OcloudWather(BaseWatcher): self._compare_and_update(ocloudmodel) def _compare_and_update(self, ocloudmodel: StxGenericModel) -> bool: - # localmodel = self._repo.get(ocloudmodel.id) - oclouds = self._repo.list() - if len(oclouds) > 1: - raise InvalidOcloudState("More than 1 ocloud is found") - if len(oclouds) == 0: - self._repo.add(ocloudmodel) - else: - localmodel = oclouds.pop() - if localmodel.is_outdated(ocloudmodel): - localmodel.update_by(ocloudmodel) - self._repo.update(localmodel) + with self._uow: + # localmodel = self._uow.stxobjects.get(str(ocloudmodel.id)) + oclouds = self._uow.stxobjects.list(ResourceTypeEnum.OCLOUD) + if len(oclouds) > 1: + raise InvalidOcloudState("More than 1 ocloud is found") + if len(oclouds) == 0: + logger.info("add ocloud:" + ocloudmodel.name + + " update_at: " + str(ocloudmodel.updatetime) + + " id: " + str(ocloudmodel.id) + + " hash: " + str(ocloudmodel.hash)) + self._uow.stxobjects.add(ocloudmodel) + else: + localmodel = oclouds.pop() + if localmodel.is_outdated(ocloudmodel): + logger.info("update ocloud:" + ocloudmodel.name + + " update_at: " + str(ocloudmodel.updatetime) + + " id: " + str(ocloudmodel.id) + + " hash: " + str(ocloudmodel.hash)) + localmodel.update_by(ocloudmodel) + self._uow.stxobjects.update(localmodel) + self._uow.commit() class DmsWatcher(BaseWatcher): - def __init__(self, client: BaseClient) -> None: + def __init__(self, client: BaseClient, + uow: AbstractUnitOfWork) -> None: super().__init__(client) + self._uow = uow def _targetname(self): return "dms" + def _probe(self): + ocloudmodel = self._client.get(None) + if ocloudmodel: + self._compare_and_update(ocloudmodel) + + def _compare_and_update(self, newmodel: StxGenericModel) -> bool: + with self._uow: + # localmodel = self._uow.stxobjects.get(ocloudmodel.id) + localmodel = self._uow.stxobjects.get(str(newmodel.id)) + if not localmodel: + logger.info("add dms:" + newmodel.name) + self._uow.stxobjects.add(newmodel) + elif localmodel.is_outdated(newmodel): + logger.info("update dms:" + newmodel.name) + localmodel.update_by(newmodel) + self._uow.stxobjects.update(newmodel) + self._uow.commit() + class ResourcePoolWatcher(BaseWatcher): - def __init__(self) -> None: + def __init__(self, client: BaseClient, + uow: AbstractUnitOfWork) -> None: super().__init__() + self._uow = uow def _targetname(self): - return "ocloud" + return "resourcepool" + + def _probe(self): + ocloudmodel = self._client.get(None) + if ocloudmodel: + logger.info("detect ocloudmodel:" + ocloudmodel.name) + self._compare_and_update(ocloudmodel) + + def _compare_and_update(self, newmodel: StxGenericModel) -> bool: + with self._uow: + # localmodel = self._uow.stxobjects.get(ocloudmodel.id) + localmodel = self._uow.stxobjects.get(str(newmodel.id)) + if not localmodel: + self._uow.stxobjects.add(newmodel) + elif localmodel.is_outdated(newmodel): + localmodel.update_by(newmodel) + self._uow.stxobjects.update(newmodel) + self._uow.commit() class ResourceWatcher(BaseWatcher): - def __init__(self) -> None: + def __init__(self, client: BaseClient, + uow: AbstractUnitOfWork) -> None: super().__init__() + self._uow = uow def _targetname(self): return "resource" + + def _probe(self): + ocloudmodel = self._client.get(None) + if ocloudmodel: + self._compare_and_update(ocloudmodel) + + def _compare_and_update(self, newmodel: StxGenericModel) -> bool: + with self._uow: + # localmodel = self._repo.get(ocloudmodel.id) + localmodel = self._uow.stxobjects.get(str(newmodel.id)) + if not localmodel: + self._uow.stxobjects.add(newmodel) + elif localmodel.is_outdated(newmodel): + localmodel.update_by(newmodel) + self._uow.stxobjects.update(newmodel) + self._uow.commit() diff --git a/o2ims/service/watcher/worker.py b/o2ims/service/watcher/worker.py index a348074..11bdfdc 100644 --- a/o2ims/service/watcher/worker.py +++ b/o2ims/service/watcher/worker.py @@ -46,7 +46,7 @@ class PollWorker(object): logger.debug("about to probe:"+w) self.watchers[w].probe() except Exception as ex: - logger.warning(ex.message) + logger.warning("Worker:" + w + " raises exception:" + str(ex)) continue self.schedinstance.enter(self.schedinterval, 1, self._repeat) @@ -59,6 +59,3 @@ class PollWorker(object): def stop(self): self._stopped = True - - -defaultworker = PollWorker() diff --git a/o2ims/views/ocloud_view.py b/o2ims/views/ocloud_view.py index 7005b5a..a42946d 100644 --- a/o2ims/views/ocloud_view.py +++ b/o2ims/views/ocloud_view.py @@ -15,7 +15,7 @@ from o2ims.service import unit_of_work -def ocloud_one(ocloudid: str, uow: unit_of_work.SqlAlchemyUnitOfWork): +def ocloud_one(ocloudid: str, uow: unit_of_work.AbstractUnitOfWork): with uow: results = uow.session.execute( """ @@ -26,7 +26,7 @@ def ocloud_one(ocloudid: str, uow: unit_of_work.SqlAlchemyUnitOfWork): return dict(results[0]) if len(results) > 0 else None -def oclouds(uow: unit_of_work.SqlAlchemyUnitOfWork): +def oclouds(uow: unit_of_work.AbstractUnitOfWork): with uow: results = uow.session.execute( """ diff --git a/requirements-stx.txt b/requirements-stx.txt new file mode 100644 index 0000000..6364e51 --- /dev/null +++ b/requirements-stx.txt @@ -0,0 +1,2 @@ +-e git+https://opendev.org/starlingx/distcloud-client.git@master#egg=distributedcloud-client&subdirectory=distributedcloud-client +-e git+https://opendev.org/starlingx/config.git@master#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client# diff --git a/requirements.txt b/requirements.txt index ca71bc2..3a14faa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,5 +6,9 @@ cotyledon Cython>=3.0a1 + +httplib2 +babel +PrettyTable<0.8,>=0.7.2 # -e git+https://opendev.org/starlingx/distcloud-client.git@master#egg=distributedcloud-client&subdirectory=distributedcloud-client # -e git+https://opendev.org/starlingx/config.git@master#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client# diff --git a/tests/integration-ocloud/test_watcher_w_stx_sa.py b/tests/integration-ocloud/test_watcher_w_stx_sa.py new file mode 100644 index 0000000..d722ee7 --- /dev/null +++ b/tests/integration-ocloud/test_watcher_w_stx_sa.py @@ -0,0 +1,30 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from multiprocessing.queues import Queue +import pytest +from o2ims.entrypoints.resource_watcher import start_watchers +from multiprocessing import Process +from multiprocessing import Pipe +# pipe = Pipe() +# q = Queue() +import time +# pytestmark = pytest.mark.usefixtures("mappers") + + +def test_watcher_service(): + testedprocess = Process(target=start_watchers, args=()) + testedprocess.start() + time.sleep(10) + testedprocess.terminate() diff --git a/tests/integration/test_clientdriver_fake_stx_sa.py b/tests/integration/test_clientdriver_fake_stx_sa.py index 8f262b8..25bc3ad 100644 --- a/tests/integration/test_clientdriver_fake_stx_sa.py +++ b/tests/integration/test_clientdriver_fake_stx_sa.py @@ -23,6 +23,7 @@ import uuid import json from o2ims.adapter.clients.ocloud_sa_client import StxSaOcloudClient from o2ims.domain import stx_object as ocloudModel +from o2ims.domain.resource_type import ResourceTypeEnum # pytestmark = pytest.mark.usefixtures("mappers") @@ -32,7 +33,7 @@ class FakeStxSaClientImp(object): super().__init__() def getInstanceInfo(self) -> ocloudModel.StxGenericModel: - model = ocloudModel.StxGenericModel() + model = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD) model.id = uuid.uuid4() model.name = "stx1" model.updatetime = datetime.now diff --git a/tests/unit/test_ocloud.py b/tests/unit/test_ocloud.py index baff737..e70d188 100644 --- a/tests/unit/test_ocloud.py +++ b/tests/unit/test_ocloud.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pytest from o2ims.domain import ocloud from o2ims import config import uuid diff --git a/tests/unit/test_watcher.py b/tests/unit/test_watcher.py index 4cdfb76..76f5d78 100644 --- a/tests/unit/test_watcher.py +++ b/tests/unit/test_watcher.py @@ -16,26 +16,29 @@ import time from datetime import datetime import json from typing import List +from o2ims.domain.resource_type import ResourceTypeEnum from o2ims.service.client.base_client import BaseClient -import pytest from o2ims.domain import ocloud from o2ims import config import uuid -from o2ims.service.watcher.base import BaseWatcher, OcloudWather +from o2ims.service.watcher.base import BaseWatcher, OcloudWatcher from o2ims.domain import stx_object as ocloudModel from o2ims.adapter.ocloud_repository import OcloudRepository +from o2ims.domain.stx_repo import StxObjectRepository from o2ims.service.watcher import worker -from o2ims.service.watcher.executor import start_watchers +from o2ims.service.unit_of_work import AbstractUnitOfWork + class FakeOcloudClient(BaseClient): def __init__(self): super().__init__() - fakeCloud = ocloudModel.StxGenericModel() + fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD) fakeCloud.id = uuid.uuid4() fakeCloud.name = 'stx1' fakeCloud.content = json.dumps({}) fakeCloud.createtime = datetime.now() - fakeCloud.updatetime = datetime.now + fakeCloud.updatetime = datetime.now() + fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime))) self.fakeCloud = fakeCloud def _get(self, id) -> ocloudModel.StxGenericModel: @@ -44,6 +47,7 @@ class FakeOcloudClient(BaseClient): def _list(self): return [self.fakeCloud] + class FakeOcloudRepo(OcloudRepository): def __init__(self): super().__init__() @@ -65,15 +69,65 @@ class FakeOcloudRepo(OcloudRepository): ocloud1 = filtered.pop() ocloud1.update_by(ocloud) + + +class FakeStxObjRepo(StxObjectRepository): + def __init__(self): + super().__init__() + self.oclouds = [] + + def _add(self, ocloud: ocloud.Ocloud): + self.oclouds.append(ocloud) + + def _get(self, ocloudid) -> ocloud.Ocloud: + filtered = [o for o in self.oclouds if o.id == ocloudid] + return filtered.pop() + + def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]: + return [x for x in self.oclouds] + + def _update(self, ocloud: ocloud.Ocloud): + filtered = [o for o in self.oclouds if o.id == ocloud.id] + assert len(filtered) == 1 + ocloud1 = filtered.pop() + ocloud1.update_by(ocloud) + + +class FakeUnitOfWork(AbstractUnitOfWork): + def __init__(self): + pass + + def __enter__(self): + # self.session = self.session_factory() # type: Session + # self.oclouds = OcloudSqlAlchemyRepository(self.session) + self.stxobjects = FakeStxObjRepo() + return super().__enter__() + + def __exit__(self, *args): + super().__exit__(*args) + # self.session.close() + + def _commit(self): + pass + # self.session.commit() + + def rollback(self): + pass + # self.session.rollback() + + def test_probe_new_ocloud(): - fakeRepo = FakeOcloudRepo() + # fakeRepo = FakeOcloudRepo() + fakeuow = FakeUnitOfWork() fakeClient = FakeOcloudClient() - ocloudwatcher = OcloudWather(fakeClient, fakeRepo) + ocloudwatcher = OcloudWatcher(fakeClient, fakeuow) ocloudwatcher.probe() - assert len(fakeRepo.oclouds) == 1 - assert fakeRepo.oclouds[0].name == "stx1" + assert len(fakeuow.stxobjects.oclouds) == 1 + assert fakeuow.stxobjects.oclouds[0].name == "stx1" + -def test_default_worker(): +def test_watchers_worker(): + testedworker = worker.PollWorker() class FakeOCloudWatcher(BaseWatcher): def __init__(self, client: BaseClient, @@ -85,24 +139,26 @@ def test_default_worker(): def _targetname(self): return "fakeocloudwatcher" - + def _probe(self): self.fakeOcloudWatcherCounter += 1 # hacking to stop the blocking sched task if self.fakeOcloudWatcherCounter > 2: - worker.defaultworker.stop() + testedworker.stop() + + # fakeRepo = FakeOcloudRepo() + fakeuow = FakeUnitOfWork() - fakeRepo = FakeOcloudRepo() fakeClient = FakeOcloudClient() - fakewatcher = FakeOCloudWatcher(fakeClient, fakeRepo) + fakewatcher = FakeOCloudWatcher(fakeClient, fakeuow) - worker.defaultworker.set_interval(1) - worker.defaultworker.add_watcher(fakewatcher) + testedworker.set_interval(1) + testedworker.add_watcher(fakewatcher) assert fakewatcher.fakeOcloudWatcherCounter == 0 count1 = fakewatcher.fakeOcloudWatcherCounter - worker.defaultworker.start() + testedworker.start() time.sleep(20) assert fakewatcher.fakeOcloudWatcherCounter > count1