Watch Elastic Metricbeat for Performance Measurement Jobs 99/14099/1
authorZhang Rong(Jon) <rong.zhang@windriver.com>
Thu, 23 Jan 2025 16:09:03 +0000 (00:09 +0800)
committerJon Zhang <rong.zhang@windriver.com>
Sat, 25 Jan 2025 02:58:37 +0000 (02:58 +0000)
This commit introduces a client implementation to interact with the
Elasticsearch API, enabling the querying of event types for performance
measurement jobs.

Test Plan:

PASS - Verify that the aggregation datasets for performance measurement
       jobs are collected as expected.
PASS - Test the performance measurement jobs API to ensure it returns
       the correct results.
PASS - Confirm that if Elasticsearch is not available, the watcher skips
       the performance measurement (PM) portion as intended.
PASS - Ensure that existing performance measurement jobs are skipped
       and not duplicated.

Change-Id: I399c5c14b585ff9e05d7aaea582bd824733e8da3
Signed-off-by: Zhang Rong(Jon) <rong.zhang@windriver.com>
(cherry picked from commit c97804097cd84f3f223a23016ff142724e88e481)

13 files changed:
configs/o2app.conf
o2app/entrypoints/resource_watcher.py
o2app/service/handlers.py
o2common/config/config.py
o2ims/adapter/clients/pm_client.py [new file with mode: 0644]
o2ims/adapter/orm.py
o2ims/domain/commands.py
o2ims/domain/performance_obj.py
o2ims/service/auditor/measurement_handler.py [new file with mode: 0644]
o2ims/service/watcher/measurement_watcher.py [new file with mode: 0644]
requirements-stx.txt
requirements-test.txt
tests/integration-ocloud/test_clientdriver_pm.py [new file with mode: 0644]

index ad1e5da..7730a23 100644 (file)
@@ -41,3 +41,10 @@ DMS_SUPPORT_PROFILES =
 [WATCHER]
 
 [PUBSUB]
+
+[PM]
+# Elasticsearch connection settings
+ES_USERNAME = sysadmin
+ES_PASSWORD = sysadmin
+ES_PORT = 31001
+ES_PATH = /mon-elasticsearch-client
index 085fb13..11b833b 100644 (file)
@@ -60,6 +60,9 @@ from o2ims.service.watcher.agg_undefined_watcher import UndefinedAggWatcher
 from o2ims.adapter.clients.aggregate_client import ComputeAggClient, \
     NetworkAggClient, StorageAggClient, UndefinedAggClient
 
+from o2ims.adapter.clients.pm_client import MeasurementJobClient
+from o2ims.service.watcher.measurement_watcher import MeasurementWatcher
+
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
 
@@ -80,9 +83,7 @@ class WatcherService(cotyledon.Service):
             root = WatcherTree(OcloudWatcher(
                 StxOcloudClient(), self.bus))
             root.addchild(
-                DmsWatcher(StxDmsClient(), self.bus))
-            # root.addchild(
-            #     AlarmWatcher(StxFaultClient(), self.bus))
+               DmsWatcher(StxDmsClient(), self.bus))
 
             child_respool = root.addchild(
                 ResourcePoolWatcher(StxResourcePoolClient(),
@@ -120,6 +121,11 @@ class WatcherService(cotyledon.Service):
             child_respool.addchild(
                 AlarmWatcher(StxAlarmClient(self.bus.uow), self.bus))
 
+            # Add Measurement watch
+            child_respool.addchild(
+                MeasurementWatcher(MeasurementJobClient(self.bus.uow),
+                                   self.bus))
+
             self.worker.add_watcher(root)
 
             self.worker.start()
index 6dc771a..63bf54c 100644 (file)
@@ -28,7 +28,7 @@ from o2ims.service.auditor import ocloud_handler, dms_handler, \
     pserver_mem_handler, pserver_port_handler, pserver_if_handler,\
     pserver_eth_handler, pserver_acc_handler, alarm_handler, \
     pserver_dev_handler, agg_compute_handler, agg_network_handler,\
-    agg_storage_handler, agg_undefined_handler
+    agg_storage_handler, agg_undefined_handler, measurement_handler
 from o2ims.service.command import notify_handler, registration_handler,\
     notify_alarm_handler, clear_alarm_handler, purge_alarm_handler
 from o2ims.service.event import ocloud_event, resource_event, \
@@ -101,4 +101,5 @@ COMMAND_HANDLERS = {
     commands.Register2SMO: registration_handler.registry_to_smo,
     commands.ClearAlarmEvent: clear_alarm_handler.clear_alarm_event,
     commands.PurgeAlarmEvent: purge_alarm_handler.purge_alarm_event,
+    commands.UpdateMeasurement: measurement_handler.update_measurement,
 }  # type: Dict[Type[commands.Command], Callable]
index ab64181..98b0ad9 100644 (file)
@@ -430,3 +430,38 @@ def get_min_retention_period():
         logger.warning(f"Invalid min_retention_period value: {e}")
 
     return _DEFAULT_MIN_RETENTION_PERIOD
+
+
+def get_es_access_info(ip=None):
+    """Get Elasticsearch access information.
+
+    Args:
+        ip (str, optional): IP address of the Elasticsearch server.
+            Defaults to None and will use environment variable.
+
+    Returns:
+        dict: Dictionary containing Elasticsearch connection details
+    """
+    # Get values from config file
+    username = config.conf.PM.ES_USERNAME
+    password = config.conf.PM.ES_PASSWORD
+    port = config.conf.PM.ES_PORT
+    path = config.conf.PM.ES_PATH
+
+    # Allow environment variables to override config file
+    username = os.getenv('ES_USERNAME', username)
+    password = os.getenv('ES_PASSWORD', password)
+    port = os.getenv('ES_PORT', port)
+    path = os.getenv('ES_PATH', path)
+
+    # Use provided IP or fallback to environment variable
+    ip = ip or os.getenv('ES_IP', None)
+
+    # Construct the URL
+    url = f'https://{ip}:{port}{path}'
+
+    return {
+        'url': url,
+        'username': username,
+        'password': password,
+    }
diff --git a/o2ims/adapter/clients/pm_client.py b/o2ims/adapter/clients/pm_client.py
new file mode 100644 (file)
index 0000000..2c7da31
--- /dev/null
@@ -0,0 +1,228 @@
+# Copyright (C) 2025 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from typing import List
+from elasticsearch import Elasticsearch
+from cgtsclient.client import get_client as get_stx_client
+from o2common.config import config
+from o2common.service.client.base_client import BaseClient
+from o2ims.domain import performance_obj as pm_obj
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class MeasurementJobClient(BaseClient):
+    def __init__(self, uow, driver=None):
+        super().__init__()
+        self.driver = driver if driver else EsPmClientImp()
+        self.uow = uow
+
+    def _get(self, id):
+        return self.driver.getMeasurementJobInfo(id)
+
+    def _list(self, **filters) -> List[pm_obj.MeasurementJob]:
+        return self.driver.getMeasurementJobList(**filters)
+
+    def _set_stx_client(self):
+        pass
+
+
+class EsPmClientImp(object):
+    def __init__(self, es_client=None, stx_client=None):
+        super().__init__()
+        self.stxclient = stx_client if stx_client else self.getStxClient()
+        self.es_client = es_client if es_client else self.getEsClient()
+
+    def getStxClient(self):
+        os_client_args = config.get_stx_access_info()
+        config_client = get_stx_client(**os_client_args)
+        return config_client
+
+    def getEsClient(self):
+        # Get OAM floating IP from iextoam
+        try:
+            iextoams = self.stxclient.iextoam.list()
+            if iextoams and hasattr(iextoams[0], 'oam_floating_ip'):
+                oam_ip = iextoams[0].oam_floating_ip
+                logger.debug(f"Using OAM floating IP for ES connection: "
+                             f"{oam_ip}")
+            else:
+                logger.warning("No OAM floating IP found, using default")
+                return None
+        except Exception as e:
+            logger.warning(f"Failed to get OAM floating IP: {str(e)}, "
+                           f"using default")
+            return None
+
+        # Get ES connection info using OAM floating IP
+        es_config = config.get_es_access_info(ip=oam_ip)
+
+        es_client = Elasticsearch(
+            es_config['url'],
+            basic_auth=(es_config['username'], es_config['password']),
+            verify_certs=False
+        )
+        if not es_client.ping():
+            logger.error("Failed to connect to Elasticsearch")
+            return None
+        return es_client
+
+    def getMeasurementJobList(self, **filters) -> List[pm_obj.MeasurementJob]:
+        # Check ES client connection first
+        try:
+            if not self.es_client or not self.es_client.ping():
+                logger.warning("Elasticsearch client is not available")
+                return []
+        except Exception as e:
+            logger.error(f"Failed to ping Elasticsearch: {str(e)}")
+            return []
+
+        query = self._build_query(**filters)
+        try:
+            response = self.es_client.search(
+                body=query
+            )
+            return [self._convert_to_measurement_job(bucket)
+                    for bucket in
+                    response['aggregations']['unique_datasets']['buckets']]
+        except Exception as e:
+            logger.error(f"Error querying Elasticsearch: {str(e)}")
+            return []
+
+    def getMeasurementJobInfo(self, id) -> pm_obj.MeasurementJob:
+        """Get detailed information for a specific measurement job."""
+        query = {
+            "size": 1,
+            "query": {
+                "bool": {
+                    "must": [
+                        {"term": {"event.dataset": id}}
+                    ]
+                }
+            }
+        }
+
+        try:
+            response = self.es_client.search(
+                body=query
+            )
+            if response['hits']['hits']:
+                return self._convert_to_measurement_job_detail(
+                    response['hits']['hits'][0], id)
+            else:
+                logger.warning(f"No data found for measurement job {id}")
+                return None
+        except Exception as e:
+            logger.error(f"Error getting measurement job info for {id}: "
+                         f"{str(e)}")
+            raise
+
+    @staticmethod
+    def _build_query(**filters):
+        must_conditions = [
+            {
+                "term": {
+                    "type": "beats"
+                }
+            },
+            {
+                "term": {
+                    "service.type": "system"
+                }
+            },
+            {
+                "range": {
+                    "@timestamp": {
+                        "gte": "now-1h"
+                    }
+                }
+            }
+        ]
+
+        # Add host filter if provided
+        if 'host_name' in filters:
+            must_conditions.append({
+                "term": {
+                    "host.name": filters['host_name']
+                }
+            })
+
+        query = {
+            "size": 0,
+            "query": {
+                "bool": {
+                    "must": must_conditions
+                }
+            },
+            "aggs": {
+                "unique_datasets": {
+                    "terms": {
+                        "field": "event.dataset",
+                        "size": filters.get('size', 100)
+                    }
+                }
+            }
+        }
+
+        return query
+
+    @staticmethod
+    def _convert_to_measurement_job(bucket) -> pm_obj.MeasurementJob:
+        """Convert ES aggregation bucket to MeasurementJob."""
+        job_id = bucket['key']  # e.g., "system.cpu"
+        doc_count = bucket['doc_count']
+
+        return pm_obj.MeasurementJob(
+            job_id=job_id,
+            consumer_job_id=f"consumer_{job_id}",
+            state=pm_obj.MeasurementJobState.ACTIVE,
+            status=pm_obj.MeasurementJobStatus.RUNNING,
+            collection_interval=60,  # Default collection interval in seconds
+            measurement_criteria=[{
+                "measurementType": job_id,
+                "measurementName": job_id,
+                "sampleCount": doc_count
+            }],
+            preinstalled_job=True,
+            resource_criteria={
+                "resourceType": "pserver"
+            }
+        )
+
+    @staticmethod
+    def _convert_to_measurement_job_detail(hit, job_id) -> \
+            pm_obj.MeasurementJob:
+        """Convert ES document to detailed MeasurementJob."""
+        source = hit['_source']
+        return pm_obj.MeasurementJob(
+            job_id=job_id,
+            consumer_job_id=f"consumer_{job_id}",
+            state=pm_obj.MeasurementJobState.ACTIVE,
+            status=pm_obj.MeasurementJobStatus.RUNNING,
+            collection_interval=60,
+            measurement_criteria=[
+                {
+                    "measurementType": job_id,
+                    "measurementName": source.get('event', {}).get('dataset'),
+                    "performanceMetric": source.get('metricset', {}).
+                    get('name', '')
+                }
+            ],
+            resource_criteria={
+                "resourceType": "pserver",
+                "resourceName": source.get('host', {}).get('name')
+            },
+            preinstalled_job=True
+        )
index 0b51590..d8362ef 100644 (file)
@@ -283,8 +283,7 @@ measured_resource = Table(
     Column("hash", String(255)),
     Column("version_number", Integer),
 
-    Column("id", String(255), primary_key=True),
-    Column("resourceId", String(255)),
+    Column("resourceId", String(255), primary_key=True),
     Column("resourceTypeId", String(255)),
     Column("measurementJobId", ForeignKey(
         "measurementJob.performanceMeasurementJobId"))
@@ -298,8 +297,7 @@ collected_measurement = Table(
     Column("hash", String(255)),
     Column("version_number", Integer),
 
-    Column("id", String(255), primary_key=True),
-    Column("measurementId", String(255)),
+    Column("measurementId", String(255), primary_key=True),
     Column("measurementJobId", ForeignKey(
         "measurementJob.performanceMeasurementJobId"))
 )
index f335f5f..7df1c16 100644 (file)
@@ -19,6 +19,7 @@ from dataclasses import dataclass
 
 from o2ims.domain.stx_object import StxGenericModel
 from o2ims.domain.alarm_obj import AlarmEvent2SMO, FaultGenericModel
+from o2ims.domain.performance_obj import MeasurementJob
 from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
 # from o2ims.domain.resource_type import ResourceTypeEnum
 from o2common.domain.commands import Command
@@ -34,6 +35,11 @@ class UpdateFaultObject(Command):
     data: FaultGenericModel
 
 
+@dataclass
+class UpdatePmObject(Command):
+    pass
+
+
 @dataclass
 class PubMessage2SMO(Command):
     data: Message2SMO
@@ -148,3 +154,9 @@ class ClearAlarmEvent(UpdateFaultObject):
 @dataclass
 class PurgeAlarmEvent(UpdateFaultObject):
     data: AlarmEvent2SMO
+
+
+@dataclass
+class UpdateMeasurement(UpdatePmObject):
+    measurement: MeasurementJob
+    parentid: str
index f5a1170..af1c8a1 100644 (file)
@@ -15,6 +15,7 @@
 from __future__ import annotations
 from enum import Enum
 from typing import List, Dict
+import json
 from o2common.domain.base import AgRoot, Serializer
 
 
@@ -78,11 +79,50 @@ class MeasurementJob(AgRoot, Serializer):
         self.consumerPerformanceJobId = consumer_job_id
         self.state = state
         self.collectionInterval = collection_interval
-        self.resourceScopeCriteria = resource_criteria or {}
-        self.measurementSelectionCriteria = measurement_criteria
+        self._resourceScopeCriteria = json.dumps(resource_criteria) if resource_criteria else '{}'  # noqa: E501
+        self._measurementSelectionCriteria = json.dumps(measurement_criteria) if measurement_criteria else '[]'  # noqa: E501
         self.status = status
         self.preinstalledJob = preinstalled_job
-        self.qualifiedResourceTypes: List[str] = []
+        self._qualifiedResourceTypes = '[]'
         self.measuredResources: List[MeasuredResource] = []
         self.collectedMeasurements: List[CollectedMeasurement] = []
         self.extensions = ''
+
+    @property
+    def resourceScopeCriteria(self) -> Dict:
+        return json.loads(self._resourceScopeCriteria)
+
+    @resourceScopeCriteria.setter
+    def resourceScopeCriteria(self, value: Dict):
+        self._resourceScopeCriteria = json.dumps(value) if value else '{}'
+
+    @property
+    def measurementSelectionCriteria(self) -> List:
+        return json.loads(self._measurementSelectionCriteria)
+
+    @measurementSelectionCriteria.setter
+    def measurementSelectionCriteria(self, value: List):
+        self._measurementSelectionCriteria = \
+            json.dumps(value) if value else '[]'
+
+    @property
+    def qualifiedResourceTypes(self) -> List:
+        return json.loads(self._qualifiedResourceTypes)
+
+    @qualifiedResourceTypes.setter
+    def qualifiedResourceTypes(self, value: List):
+        self._qualifiedResourceTypes = json.dumps(value) if value else '[]'
+
+    def __composite_values__(self):
+        return [
+            self.performanceMeasurementJobId,
+            self.consumerPerformanceJobId,
+            self.state,
+            self.collectionInterval,
+            self._resourceScopeCriteria,
+            self._measurementSelectionCriteria,
+            self.status,
+            self.preinstalledJob,
+            self._qualifiedResourceTypes,
+            self.extensions
+        ]
diff --git a/o2ims/service/auditor/measurement_handler.py b/o2ims/service/auditor/measurement_handler.py
new file mode 100644 (file)
index 0000000..30b083d
--- /dev/null
@@ -0,0 +1,120 @@
+# Copyright (C) 2025 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+import json
+
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain import commands, ocloud
+from o2ims.domain.performance_obj import (
+    MeasurementJob, MeasurementJobState, MeasurementJobStatus,
+    MeasuredResource, CollectedMeasurement
+)
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+def update_measurement(
+    cmd: commands.UpdateMeasurement,
+    uow: AbstractUnitOfWork
+):
+    job = cmd.measurement
+    logger.info(f"Processing measurement job: "
+                f"{job.performanceMeasurementJobId}")
+
+    with uow:
+        resourcepool = uow.resource_pools.get(cmd.parentid)
+
+        # Check if measurement job already exists
+        measurement_job = uow.measurement_jobs.get(
+            job.performanceMeasurementJobId)
+        if not measurement_job:
+            # Create new measurement job
+            measurement_job = create_measurement_job(job)
+            # Link to resources
+            restype = uow.resource_types.get_by_name('pserver')
+            if restype:
+                measurement_job.qualifiedResourceTypes.append(
+                    restype.resourceTypeId)
+                args = [ocloud.Resource.resourceTypeId ==
+                        restype.resourceTypeId]
+                hosts = uow.resources.list(resourcepool.resourcePoolId, *args)
+
+                for host in hosts:
+                    # Add measured resource
+                    measured_resource = MeasuredResource(
+                        host.resourceId,
+                        restype.resourceTypeId,
+                        is_currently_measured=True
+                    )
+                    measurement_job.measuredResources.append(measured_resource)
+
+                    # Add collected measurement
+                    collected_measurement = CollectedMeasurement(
+                        f"{job.performanceMeasurementJobId}_{host.resourceId}",
+                        restype.resourceTypeId,
+                        job.performanceMeasurementJobId,
+                        [],  # timeAdded
+                        is_currently_measured=True
+                    )
+                    measurement_job.collectedMeasurements.append(
+                        collected_measurement)
+
+            uow.measurement_jobs.add(measurement_job)
+            logger.info(f"Added measurement job: "
+                        f"{job.performanceMeasurementJobId}")
+
+        else:
+            # Update existing job
+            if is_job_changed(measurement_job, job):
+                logger.info(f"Updating measurement job: "
+                            f"{job.performanceMeasurementJobId}")
+                update_measurement_job(measurement_job, job)
+                uow.measurement_jobs.update(measurement_job)
+
+        uow.commit()
+
+
+def is_job_changed(current_job: MeasurementJob,
+                   new_job: MeasurementJob) -> bool:
+    """Check if measurement job needs updating."""
+    return (current_job.state != new_job.state or
+            current_job.status != new_job.status or
+            current_job.collectionInterval != new_job.collectionInterval)
+
+
+def create_measurement_job(job: MeasurementJob) -> MeasurementJob:
+    """Create new measurement job."""
+    return MeasurementJob(
+        job_id=job.performanceMeasurementJobId,
+        consumer_job_id=job.consumerPerformanceJobId,
+        state=job.state or MeasurementJobState.ACTIVE,
+        collection_interval=job.collectionInterval,
+        measurement_criteria=job.measurementSelectionCriteria,
+        status=job.status or MeasurementJobStatus.RUNNING,
+        preinstalled_job=job.preinstalledJob,
+        resource_criteria=job.resourceScopeCriteria
+    )
+
+
+def update_measurement_job(target: MeasurementJob,
+                           source: MeasurementJob) -> None:
+    """Update existing measurement job with new data."""
+    target.state = source.state
+    target.status = source.status
+    target.collectionInterval = source.collectionInterval
+    target.measurementSelectionCriteria = source.measurementSelectionCriteria
+    target.resourceScopeCriteria = source.resourceScopeCriteria
diff --git a/o2ims/service/watcher/measurement_watcher.py b/o2ims/service/watcher/measurement_watcher.py
new file mode 100644 (file)
index 0000000..0482cb8
--- /dev/null
@@ -0,0 +1,52 @@
+# Copyright (C) 2025 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from o2common.domain import tags
+from o2common.service.messagebus import MessageBus
+from o2common.service.watcher.base import BaseWatcher
+from o2common.service.client.base_client import BaseClient
+
+from o2ims.domain import commands
+from o2ims.domain.stx_object import StxGenericModel
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class MeasurementWatcher(BaseWatcher):
+    def __init__(self, pm_client: BaseClient,
+                 bus: MessageBus) -> None:
+        super().__init__(pm_client, bus)
+        self._tags = tags.Tag()
+        self.poolid = None
+
+    def _targetname(self):
+        return "measurement"
+
+    def _probe(self, parent: StxGenericModel, tags: object = None):
+        # Set a tag for children resource
+        self._tags.pool = parent.res_pool_id
+        self._set_respool_client()
+
+        # Get new measurements
+        newmodels = self._client.list(
+            size=100,  # Configurable batch size
+            event_module='system'  # Filter for system metrics
+        )
+
+        return [commands.UpdateMeasurement(m, self.poolid)
+                for m in newmodels] if len(newmodels) > 0 else []
+
+    def _set_respool_client(self):
+        self.poolid = self._tags.pool
index 652fe5b..1fc4ab6 100644 (file)
@@ -5,4 +5,6 @@
 # Updated to lastest commit at stx 9.0
 -e git+https://opendev.org/starlingx/distcloud-client.git@r/stx.9.0#egg=distributedcloud-client&subdirectory=distributedcloud-client
 -e git+https://opendev.org/starlingx/config.git@r/stx.9.0#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client
--e git+https://opendev.org/starlingx/fault.git@r/stx.9.0#egg=fmclient&subdirectory=python-fmclient/fmclient
\ No newline at end of file
+-e git+https://opendev.org/starlingx/fault.git@r/stx.9.0#egg=fmclient&subdirectory=python-fmclient/fmclient
+
+elasticsearch
\ No newline at end of file
index 09238cd..76ddf99 100644 (file)
@@ -15,3 +15,5 @@ pyOpenSSL
 
 # -e git+https://opendev.org/starlingx/distcloud-client.git@master#egg=distributedcloud-client&subdirectory=distributedcloud-client
 # -e git+https://opendev.org/starlingx/config.git@master#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client
+
+elasticsearch
diff --git a/tests/integration-ocloud/test_clientdriver_pm.py b/tests/integration-ocloud/test_clientdriver_pm.py
new file mode 100644 (file)
index 0000000..cbfd2ee
--- /dev/null
@@ -0,0 +1,135 @@
+# Copyright (C) 2025 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import pytest
+from elasticsearch import Elasticsearch
+
+from o2common.config import config
+from o2ims.adapter.clients.pm_client import StxPmClientImp
+from o2ims.domain import performance_obj as pm_obj
+
+
+@pytest.fixture
+def real_es_client():
+    es_config = config.get_es_access_info()
+    es_client = Elasticsearch(
+        es_config['url'],
+        basic_auth=(es_config['username'], es_config['password']),
+        verify_certs=False
+    )
+    yield es_client
+
+
+@pytest.fixture
+def real_pm_client(real_es_client):
+    pm_client = StxPmClientImp(real_es_client)
+    yield pm_client
+
+
+def test_get_es_client():
+    pm_client = StxPmClientImp()
+    assert pm_client.es_client is not None
+    assert pm_client.es_client.ping()
+
+
+def test_get_pm_list(real_pm_client):
+    # Test getting PM list without filters
+    pm_list = real_pm_client.getPmList()
+    assert pm_list is not None
+    if len(pm_list) > 0:
+        assert isinstance(pm_list[0], pm_obj.PmGenericModel)
+        assert pm_list[0].id is not None
+        assert pm_list[0].host_name is not None
+
+
+def test_get_pm_list_with_host_filter(real_pm_client):
+    # Test getting PM list with host filter
+    pm_list = real_pm_client.getPmList(host_name="controller-0")
+    assert pm_list is not None
+    if len(pm_list) > 0:
+        assert isinstance(pm_list[0], pm_obj.PmGenericModel)
+        assert pm_list[0].host_name == "controller-0"
+
+
+def test_get_pm_list_with_size_filter(real_pm_client):
+    # Test getting PM list with size filter
+    size = 5
+    pm_list = real_pm_client.getPmList(size=size)
+    assert pm_list is not None
+    assert len(pm_list) <= size
+
+
+def test_get_pm_info(real_pm_client):
+    # First get a list to get a valid ID
+    pm_list = real_pm_client.getPmList()
+    if len(pm_list) > 0:
+        pm_id = pm_list[0].id
+        pm_info = real_pm_client.getPmInfo(pm_id)
+        assert pm_info is not None
+        assert isinstance(pm_info, pm_obj.PmGenericModel)
+        assert pm_info.id == pm_id
+
+
+def test_build_query():
+    # Test query building with different filter combinations
+    filters = {
+        'host_name': 'controller-0',
+        'size': 5
+    }
+    query = StxPmClientImp._build_query(**filters)
+
+    # Verify query structure
+    assert query['size'] == 0
+    assert 'query' in query
+    assert 'bool' in query['query']
+    assert 'must' in query['query']['bool']
+
+    # Verify must conditions
+    must_conditions = query['query']['bool']['must']
+    assert {'term': {'type': 'beats'}} in must_conditions
+    assert {'term': {'service.type': 'system'}} in must_conditions
+    assert {'term': {'host.name': 'controller-0'}} in must_conditions
+
+    # Verify aggregations
+    assert 'aggs' in query
+    assert 'unique_datasets' in query['aggs']
+    assert query['aggs']['unique_datasets']['terms']['size'] == 5
+
+
+def test_convert_to_pm_model():
+    # Test PM model conversion
+    es_hit = {
+        '_id': 'test_id',
+        '_source': {
+            'host': {'name': 'test_host'},
+            'event': {'module': 'test_module'},
+            'raw_data': {'test': 'data'}
+        }
+    }
+
+    pm_model = StxPmClientImp._convert_to_pm_model(es_hit)
+    assert isinstance(pm_model, pm_obj.PmGenericModel)
+    assert pm_model.id == 'test_id'
+    assert pm_model.host_name == 'test_host'
+    assert pm_model.event_module == 'test_module'
+    assert pm_model.raw_data == es_hit['_source']
+
+
+def test_set_pm_client(real_pm_client):
+    # Test setting PM client for different resource pools
+    resource_pool_id = 'test_pool'
+    real_pm_client.setPmClient(resource_pool_id)
+    # Since setPmClient is a placeholder, just verify
+    # it doesn't raise an exception
+    assert True