-# Copyright (C) 2021 Wind River Systems, Inc.
+# Copyright (C) 2021-2025 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
from o2common.config import config
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.adapter import ocloud_repository, alarm_repository
+from o2ims.adapter import ocloud_repository, alarm_repository, \
+ performance_repository
from o2dms.adapter import dms_repository
from o2common.helper import o2logging
.AlarmProbableCauseSqlAlchemyRepository(self.session)
self.alarm_service_config = alarm_repository\
.AlarmServiceConfigurationSqlAlchemyRepository(self.session)
+ self.measurement_jobs = performance_repository\
+ .MeasurementJobSqlAlchemyRepository(self.session)
return super().__enter__()
-# Copyright (C) 2021-2024 Wind River Systems, Inc.
+# Copyright (C) 2021-2025 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
return get_root_api_base() + 'o2ims-infrastructureMonitoring'
+def get_o2ims_performance_api_base():
+ return get_root_api_base() + 'o2ims-infrastructurePerformance'
+
+
def get_o2dms_api_base():
return get_root_api_base() + "o2dms/v1"
-# Copyright (C) 2021 Wind River Systems, Inc.
+# Copyright (C) 2021-2025 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# Date,
DateTime,
ForeignKey,
- # Boolean,
+ Boolean,
# engine,
# event,
exc,
from o2ims.domain import alarm_obj as alarmModel
from o2ims.domain.resource_type import ResourceTypeEnum, ResourceKindEnum
# from o2ims.domain.alarm_obj import AlarmLastChangeEnum, PerceivedSeverityEnum
+from o2ims.domain import performance_obj as perfModel
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
Column("retentionPeriod", Integer, default=15)
)
+measurement_job = Table(
+ "measurementJob",
+ metadata,
+ Column("updatetime", DateTime),
+ Column("createtime", DateTime),
+ Column("hash", String(255)),
+ Column("version_number", Integer),
+
+ Column("performanceMeasurementJobId", String(255), primary_key=True),
+ Column("consumerPerformanceJobId", String(255)),
+ Column("state", String(255)), # MeasurementJobState enum
+ Column("collectionInterval", Integer),
+ Column("resourceScopeCriteria", Text), # JSON stored as text
+ Column("measurementSelectionCriteria", Text), # JSON stored as text
+ Column("status", String(255)), # MeasurementJobStatus enum
+ Column("preinstalledJob", Boolean),
+ Column("qualifiedResourceTypes", Text), # JSON array stored as text
+ Column("extensions", Text) # JSON stored as text
+)
+
+measured_resource = Table(
+ "measuredResource",
+ metadata,
+ Column("updatetime", DateTime),
+ Column("createtime", DateTime),
+ Column("hash", String(255)),
+ Column("version_number", Integer),
+
+ Column("id", String(255), primary_key=True),
+ Column("resourceId", String(255)),
+ Column("resourceTypeId", String(255)),
+ Column("measurementJobId", ForeignKey(
+ "measurementJob.performanceMeasurementJobId"))
+)
+
+collected_measurement = Table(
+ "collectedMeasurement",
+ metadata,
+ Column("updatetime", DateTime),
+ Column("createtime", DateTime),
+ Column("hash", String(255)),
+ Column("version_number", Integer),
+
+ Column("id", String(255), primary_key=True),
+ Column("measurementId", String(255)),
+ Column("measurementJobId", ForeignKey(
+ "measurementJob.performanceMeasurementJobId"))
+)
+
@retry((exc.IntegrityError), tries=3, delay=2)
def wait_for_metadata_ready(engine):
)
mapper(subModel.Subscription, subscription)
+ # Performance Monitoring Mappering
+ measured_resource_mapper = mapper(
+ perfModel.MeasuredResource,
+ measured_resource
+ )
+
+ collected_measurement_mapper = mapper(
+ perfModel.CollectedMeasurement,
+ collected_measurement
+ )
+
+ mapper(
+ perfModel.MeasurementJob,
+ measurement_job,
+ properties={
+ "measuredResources": relationship(
+ measured_resource_mapper,
+ cascade="all, delete-orphan"
+ ),
+ "collectedMeasurements": relationship(
+ collected_measurement_mapper,
+ cascade="all, delete-orphan"
+ )
+ }
+ )
+
if engine is not None:
wait_for_metadata_ready(engine)
--- /dev/null
+# Copyright (C) 2024-2025 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+from typing import List, Tuple
+
+from o2ims.domain import performance_obj
+from o2ims.domain.performance_repo import MeasurementJobRepository
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class MeasurementJobSqlAlchemyRepository(MeasurementJobRepository):
+ def __init__(self, session):
+ super().__init__()
+ self.session = session
+
+ def _add(self, measurement_job: performance_obj.MeasurementJob):
+ self.session.add(measurement_job)
+
+ def _get(self, measurement_job_id) -> performance_obj.MeasurementJob:
+ return self.session.query(performance_obj.MeasurementJob).filter_by(
+ performanceMeasurementJobId=measurement_job_id).first()
+
+ def _list(self, *args, **kwargs) -> Tuple[
+ int, List[performance_obj.MeasurementJob]]:
+ size = kwargs.pop('limit') if 'limit' in kwargs else None
+ offset = kwargs.pop('start') if 'start' in kwargs else 0
+
+ result = self.session.query(performance_obj.MeasurementJob).filter(
+ *args).order_by('performanceMeasurementJobId')
+ count = result.count()
+ if size is not None and size != -1:
+ return (count, result.limit(size).offset(offset))
+ return (count, result)
+
+ def _update(self, measurement_job: performance_obj.MeasurementJob):
+ self.session.merge(measurement_job)
+
+ def _delete(self, measurement_job_id):
+ self.session.query(performance_obj.MeasurementJob).filter_by(
+ performanceMeasurementJobId=measurement_job_id).delete()
--- /dev/null
+# Copyright (C) 2024-2025 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import annotations
+from enum import Enum
+from typing import List, Dict
+from o2common.domain.base import AgRoot, Serializer
+
+
+class MeasurementJobState(str, Enum):
+ ACTIVE = "ACTIVE"
+ SUSPENDED = "SUSPENDED"
+ DEPRECATED = "DEPRECATED"
+
+
+class MeasurementJobStatus(str, Enum):
+ RUNNING = "RUNNING"
+ FAILED = "FAILED"
+ DEGRADED = "DEGRADED"
+ IDLE = "IDLE"
+ PENDING_DELETE = "PENDING_DELETE"
+
+
+class MeasuredResource(AgRoot, Serializer):
+ def __init__(self, resource_id: str, resource_type_id: str,
+ time_added: List[str] = None,
+ time_deleted: List[str] = None,
+ is_currently_measured: bool = False) -> None:
+ super().__init__()
+ self.resourceId = resource_id
+ self.resourceTypeId = resource_type_id
+ self.timeAdded = time_added or [] # List of timestamps
+ self.timeDeleted = time_deleted or [] # List of timestamps
+ self.isCurrentlyMeasured = is_currently_measured # Boolean, ReadOnly
+
+
+class CollectedMeasurement(AgRoot, Serializer):
+ def __init__(self, measurement_id: str, resource_type_id: str,
+ performance_measurement_definition_id: str,
+ time_added: List[str],
+ time_deleted: List[str] = None,
+ is_currently_measured: bool = False) -> None:
+ super().__init__()
+ self.measurementId = measurement_id
+ self.resourceTypeId = resource_type_id # ReadOnly, UUID format
+ self.performanceMeasurementDefinitionId = \
+ performance_measurement_definition_id # ReadOnly, UUID format
+ self.timeAdded = time_added # List of timestamps
+ self.timeDeleted = time_deleted or [] # List of timestamps
+ self.isCurrentlyMeasured = is_currently_measured # Boolean, ReadOnly
+
+
+class MeasurementJob(AgRoot, Serializer):
+ def __init__(
+ self,
+ job_id: str,
+ consumer_job_id: str,
+ state: MeasurementJobState,
+ collection_interval: int,
+ measurement_criteria: List[Dict[str, str]],
+ status: MeasurementJobStatus,
+ preinstalled_job: bool,
+ resource_criteria: Dict[str, str] = None,
+ ) -> None:
+ super().__init__()
+ self.performanceMeasurementJobId = job_id
+ self.consumerPerformanceJobId = consumer_job_id
+ self.state = state
+ self.collectionInterval = collection_interval
+ self.resourceScopeCriteria = resource_criteria or {}
+ self.measurementSelectionCriteria = measurement_criteria
+ self.status = status
+ self.preinstalledJob = preinstalled_job
+ self.qualifiedResourceTypes: List[str] = []
+ self.measuredResources: List[MeasuredResource] = []
+ self.collectedMeasurements: List[CollectedMeasurement] = []
+ self.extensions = ''
--- /dev/null
+# Copyright (C) 2024-2025 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+from typing import List, Set, Tuple
+from o2ims.domain import performance_obj as obj
+
+
+class MeasurementJobRepository(abc.ABC):
+ def __init__(self):
+ self.seen = set() # type: Set[obj.MeasurementJob]
+
+ def add(self, measurement_job: obj.MeasurementJob):
+ self._add(measurement_job)
+ self.seen.add(measurement_job)
+
+ def get(self, measurement_job_id) -> obj.MeasurementJob:
+ measurement_job = self._get(measurement_job_id)
+ if measurement_job:
+ self.seen.add(measurement_job)
+ return measurement_job
+
+ def list(self, *args) -> List[obj.MeasurementJob]:
+ return self._list(*args)[1]
+
+ def list_with_count(self, *args, **kwargs) -> \
+ Tuple[int, List[obj.MeasurementJob]]:
+ return self._list(*args, **kwargs)
+
+ def update(self, measurement_job: obj.MeasurementJob):
+ self._update(measurement_job)
+
+ def delete(self, measurement_job_id):
+ self._delete(measurement_job_id)
+
+ @abc.abstractmethod
+ def _add(self, measurement_job: obj.MeasurementJob):
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def _get(self, measurement_job_id) -> obj.MeasurementJob:
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def _list(self, **kwargs) -> Tuple[int, List[obj.MeasurementJob]]:
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def _update(self, measurement_job: obj.MeasurementJob):
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def _delete(self, measurement_job_id):
+ raise NotImplementedError
-# Copyright (C) 2021 Wind River Systems, Inc.
+# Copyright (C) 2021-2025 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from . import api_ns, ocloud_route, alarm_route
+from . import api_ns, ocloud_route, alarm_route, performance_route
from o2common.config import config
def configure_namespace(app):
apiims = config.get_o2ims_api_base()
apimonitoring = config.get_o2ims_monitoring_api_base()
+ apiperformance = config.get_o2ims_performance_api_base()
logger.info(
"Expose the O2 IMS API:{}\n \
Expose Monitoring API: {}".
ocloud_route.configure_api_route()
alarm_route.configure_api_route()
+ performance_route.configure_api_route()
app.add_namespace(api_ns.api_ims_inventory, path=apiims)
app.add_namespace(api_ns.api_ims_monitoring, path=apimonitoring)
+ app.add_namespace(api_ns.api_ims_performance, path=apiperformance)
-# Copyright (C) 2021-2022 Wind River Systems, Inc.
+# Copyright (C) 2021-2025 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
"O2IMS-InfrastructureMonitoring",
description='O2 IMS Monitoring related operations.')
+api_ims_performance = O2Namespace(
+ 'O2IMS-InfrastructurePerformance',
+ description='O2 IMS Infrastructure Performance API',
+ validate=True
+)
+
@api_ims_inventory.route('/api_versions')
class InventoryVersion(Resource):
# 'retirementDate': ''
}]
}
+
+
+@api_ims_performance.route('/api_version')
+class PerformanceVersion(Resource):
+ api_version = api_ims_inventory.model(
+ 'PerformanceApiVersionStructure',
+ {
+ 'version': fields.String(
+ required=True,
+ example='1.0.0',
+ description='Identifies a supported version.'
+ )
+ },
+ mask='{version,}'
+ )
+ model = api_ims_inventory.model(
+ "PerformanceAPIVersion",
+ {
+ 'uriPrefix': fields.String(
+ required=True,
+ example='https://128.224.115.36:30205/' +
+ 'o2ims-infrastructurePerformance',
+ description='Specifies the URI prefix for the API'),
+ 'apiVersions': fields.List(
+ fields.Nested(api_version),
+ example=[{'version': '1.0.0'}],
+ description='Version(s) supported for the API ' +
+ 'signaled by the uriPrefix attribute.'),
+ },
+ mask='{uriPrefix,apiVersions}'
+ )
+
+ @api_ims_performance.doc('Get Performance Version')
+ @api_ims_monitoring.marshal_with(model)
+ def get(self):
+ """Get Performance Version"""
+ return {
+ 'uriPrefix': request.base_url.rsplit('/', 1)[0],
+ 'apiVersions': [{
+ 'version': '1.0.0',
+ # 'isDeprecated': 'False',
+ # 'retirementDate': ''
+ }]
+ }
--- /dev/null
+# Copyright (C) 2024-2025 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from flask_restx import fields
+from o2common.views.flask_restx_fields import Json2Dict
+from o2ims.views.api_ns import api_ims_performance as api_performance_v1
+
+
+class PerformanceApiV1DTO:
+ api_version = api_performance_v1.model(
+ 'PerformanceV1ApiVersionStructure',
+ {
+ 'version': fields.String(
+ required=True,
+ example='1.0.0',
+ description='Identifies a supported version.'
+ )
+ },
+ mask='{version,}'
+ )
+
+ api_version_info_get = api_performance_v1.model(
+ "PerformanceV1APIVersion",
+ {
+ 'uriPrefix': fields.String(
+ required=True,
+ example='https://192.168.204.1:30205/' +
+ 'o2ims-infrastructurePerformance/v1',
+ description='Specifies the URI prefix for the API'),
+ 'apiVersions': fields.List(
+ fields.Nested(api_version),
+ example=[{'version': '1.0.0'}],
+ description='Version(s) supported for the API ' +
+ 'signaled by the uriPrefix attribute.'),
+ },
+ mask='{uriPrefix,apiVersions}'
+ )
+
+
+class PerformanceDTO:
+ measurement_job_get = api_performance_v1.model(
+ "MeasurementJobGetDto",
+ {
+ 'performanceMeasurementJobId': fields.String(
+ required=True,
+ example='97cc2b01-0e71-4a93-a911-2e87f04d996f',
+ description='Identifier of this instance of Performance ' +
+ 'Meaurement Job within the IMS'),
+ 'consumerPerformanceJobId': fields.String(
+ example='3F20D850-AF4F-A84F-FB5A-0AD585410361',
+ description='Identifier provided by the consumer for its ' +
+ 'purpose of managing performance jobs'),
+ 'state': fields.String(
+ example='ACTIVE',
+ description='The current state of the Performance ' +
+ 'Measurement Job'),
+ 'collectionInterval': fields.Integer(
+ example=300,
+ description='The interval at which performance measures ' +
+ 'will be collected and stored'),
+ 'resourceScopeCriteria': Json2Dict(
+ example={'resourceType': 'compute_node'},
+ description='Key value pairs of resource attributes which ' +
+ 'are used to select resources'),
+ 'measurementSelectionCriteria': Json2Dict(
+ example=[{"measurementGroup": "MemoryUsage"},
+ {"measurementName": "cpuAverageUtilization"}],
+ description='Key value pairs that identify the distinct ' +
+ 'set of measurements'),
+ 'status': fields.String(
+ example='RUNNING',
+ description='This reflects the condition within the state'),
+ 'preinstalledJob': fields.Boolean(
+ example=False,
+ description='Boolean which is True if created by O-Cloud ' +
+ 'and False for external consumer'),
+ 'qualifiedResourceTypes': fields.List(
+ fields.String,
+ example=['7c491f8f-7207-4c00-9b67-3d2ee8b008f0',
+ '31040dec-8106-44db-83bc-62e1d618ea17'],
+ description='The distinct set of ResourceTypes among ' +
+ 'those measuredResources'),
+ 'measuredResources': fields.List(
+ fields.Nested(api_performance_v1.model('MeasuredResource', {
+ 'resourceId': fields.String(),
+ 'resourceTypeId': fields.String()
+ })),
+ description='Historical list of resources measured by this job'
+ ),
+ 'collectedMeasurements': fields.List(
+ fields.Nested(api_performance_v1.model(
+ 'CollectedMeasurement', {
+ 'measurementId': fields.String()
+ })),
+ description='Historical list of measurements collected ' +
+ 'by this job'
+ ),
+ 'extensions': Json2Dict(attribute='extensions')
+ }
+ )
--- /dev/null
+# Copyright (C) 2024-2025 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from flask import request
+from flask_restx import Resource, reqparse
+
+from o2common.service.messagebus import MessageBus
+from o2common.views.pagination_route import link_header, PAGE_PARAM
+from o2common.views.route_exception import NotFoundException
+from o2ims.views.api_ns import api_ims_performance as api_performance_v1
+from o2ims.views.performance_dto import PerformanceDTO, PerformanceApiV1DTO
+from o2ims.views import performance_view
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+def configure_api_route():
+ # Set global bus for resource
+ global bus
+ bus = MessageBus.get_instance()
+
+
+# ---------- API versions ---------- #
+@api_performance_v1.route("/v1/api_versions")
+class VersionRouter(Resource):
+ model = PerformanceApiV1DTO.api_version_info_get
+
+ @api_performance_v1.doc('Get Performance API version')
+ @api_performance_v1.marshal_list_with(model)
+ def get(self):
+ """Get Performance API Version"""
+ return {
+ 'uriPrefix': request.base_url.rsplit('/', 1)[0],
+ 'apiVersions': [{
+ 'version': '1.0.0',
+ }]
+ }
+
+
+@api_performance_v1.route("/v1/measurementJobs")
+@api_performance_v1.param(PAGE_PARAM,
+ 'Page number of the results to fetch.' +
+ ' Default: 1',
+ _in='query', default=1)
+@api_performance_v1.param(
+ 'filter',
+ 'Filter of the query.',
+ _in='query')
+class MeasurementJobListRouter(Resource):
+ model = PerformanceDTO.measurement_job_get
+
+ @api_performance_v1.doc('Get Measurement Job List')
+ @api_performance_v1.marshal_list_with(model)
+ def get(self):
+ """Get Measurement Job List"""
+ parser = reqparse.RequestParser()
+ parser.add_argument(PAGE_PARAM, location='args')
+ parser.add_argument('filter', location='args')
+ args = parser.parse_args()
+ kwargs = {}
+ if args.nextpage_opaque_marker is not None:
+ kwargs['page'] = args.nextpage_opaque_marker
+ kwargs['filter'] = args.filter if args.filter is not None else ''
+
+ ret = performance_view.measurement_jobs(bus.uow, **kwargs)
+ return link_header(request.full_path, ret)
+
+
+@api_performance_v1.route("/v1/measurementJobs/<measurementJobId>")
+@api_performance_v1.param('measurementJobId', 'ID of the measurement job')
+@api_performance_v1.response(404, 'Measurement Job not found')
+class MeasurementJobGetRouter(Resource):
+ model = PerformanceDTO.measurement_job_get
+
+ @api_performance_v1.doc('Get Measurement Job Information')
+ @api_performance_v1.marshal_with(model)
+ def get(self, measurementJobId):
+ """Get Measurement Job Information"""
+ result = performance_view.measurement_job_one(measurementJobId,
+ bus.uow)
+ if result is not None:
+ return result
+ raise NotFoundException(
+ "Measurement Job {} doesn't exist".format(measurementJobId))
--- /dev/null
+# Copyright (C) 2024-2025 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from o2common.service import unit_of_work
+from o2common.views.view import gen_filter
+from o2common.views.pagination_view import Pagination
+from o2ims.domain.performance_obj import MeasurementJob
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+def measurement_jobs(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
+ """Get list of measurement jobs with pagination support"""
+ pagination = Pagination(**kwargs)
+ query_kwargs = pagination.get_pagination()
+ args = gen_filter(MeasurementJob,
+ kwargs['filter']) if 'filter' in kwargs else []
+
+ with uow:
+ li = uow.measurement_jobs.list_with_count(*args, **query_kwargs)
+ return pagination.get_result(li)
+
+
+def measurement_job_one(measurementJobId: str,
+ uow: unit_of_work.AbstractUnitOfWork):
+ """Get a single measurement job by ID"""
+ with uow:
+ first = uow.measurement_jobs.get(measurementJobId)
+ return first.serialize() if first is not None else None
--- /dev/null
+# Copyright (C) 2024-2025 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+import pytest
+from unittest.mock import MagicMock
+
+from o2common.config import config
+
+from o2ims.domain.resource_type import ResourceTypeEnum
+from o2ims.domain import performance_obj as po
+from o2ims.domain import commands
+from o2ims.views import performance_view
+from o2ims.service.watcher.alarm_watcher import AlarmWatcher
+
+
+@pytest.fixture
+def sample_measurement_job():
+ return po.MeasurementJob(
+ job_id=str(uuid.uuid4()),
+ consumer_job_id=str(uuid.uuid4()),
+ state=po.MeasurementJobState.ACTIVE,
+ collection_interval=300,
+ measurement_criteria=[{"metric": "cpu_usage"}],
+ status=po.MeasurementJobStatus.RUNNING,
+ preinstalled_job=False,
+ resource_criteria={"resourceType": "compute_node"}
+ )
+
+
+def test_view_measurement_jobs(mock_uow, sample_measurement_job):
+ session, uow = mock_uow
+
+ # Mock the response for the measurement jobs
+ measurement_job1 = MagicMock()
+ measurement_job1.serialize.return_value = sample_measurement_job
+
+ order_by = MagicMock()
+ order_by.count.return_value = 1
+ order_by.limit.return_value.offset.return_value = [measurement_job1]
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = order_by
+
+ # Call the view function
+ result = performance_view.measurement_jobs(uow)
+
+ assert result['count'] == 1
+ ret_list = result['results']
+ assert str(ret_list[0].performanceMeasurementJobId) == \
+ sample_measurement_job.performanceMeasurementJobId
+
+
+def test_view_measurement_job_one(mock_uow, sample_measurement_job):
+ session, uow = mock_uow
+ # Mock None response for a single measurement job
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = None
+
+ # Query return None
+ result = performance_view.measurement_job_one(
+ sample_measurement_job.performanceMeasurementJobId, uow)
+ assert result is None
+
+ # Mock the response for a single measurement job
+ session.return_value.query.return_value.filter_by.return_value.first.\
+ return_value.serialize.return_value = sample_measurement_job
+
+ # Call the view function
+ result = performance_view.measurement_job_one(
+ sample_measurement_job.performanceMeasurementJobId, uow)
+
+ assert str(result.performanceMeasurementJobId) == \
+ sample_measurement_job.performanceMeasurementJobId
+
+
+def test_flask_get_list(mock_flask_uow):
+ session, app = mock_flask_uow
+ order_by = MagicMock()
+ order_by.count.return_value = 0
+ order_by.limit.return_value.offset.return_value = []
+ session.return_value.query.return_value.filter.return_value.\
+ order_by.return_value = order_by
+ apibase = config.get_o2ims_performance_api_base() + '/v1'
+
+ with app.test_client() as client:
+ # Get list and return empty list
+ ##########################
+ resp = client.get(apibase+"/measurementJobs")
+ assert resp.get_data() == b'[]\n'
+
+
+def test_flask_get_one(mock_flask_uow, sample_measurement_job):
+ session, app = mock_flask_uow
+
+ session.return_value.query.return_value.filter_by.return_value.\
+ first.return_value = sample_measurement_job
+ apibase = config.get_o2ims_performance_api_base() + '/v1'
+
+ with app.test_client() as client:
+ # Get one and return 200
+ ###########################
+ resp = client.get(apibase+"/measurementJobs/" +
+ sample_measurement_job.performanceMeasurementJobId)
+ assert resp.status_code == 200
+ assert resp.json['performanceMeasurementJobId'] == \
+ sample_measurement_job.performanceMeasurementJobId
+
+
+def test_flask_get_one_not_found(mock_flask_uow):
+ session, app = mock_flask_uow
+
+ session.return_value.query.return_value.filter_by.return_value.\
+ first.return_value = None
+ apibase = config.get_o2ims_performance_api_base() + '/v1'
+
+ with app.test_client() as client:
+ # Get one and return 404
+ ###########################
+ measurement_job_id1 = str(uuid.uuid4())
+ resp = client.get(apibase+"/measurementJobs/"+measurement_job_id1)
+ assert resp.status_code == 404
+
+
+def test_flask_not_allowed_methods(mock_flask_uow):
+ _, app = mock_flask_uow
+ apibase = config.get_o2ims_performance_api_base() + '/v1'
+
+ with app.test_client() as client:
+ # Testing measurement jobs not support method
+ ##########################
+ uri = apibase + "/measurementJobs"
+ resp = client.post(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.put(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.patch(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
+ resp = client.delete(uri)
+ assert resp.status == '405 METHOD NOT ALLOWED'
\ No newline at end of file