From: Zhang Rong(Jon) Date: Thu, 23 Jan 2025 16:09:03 +0000 (+0800) Subject: Watch Elastic Metricbeat for Performance Measurement Jobs X-Git-Tag: l-release~8 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=c97804097cd84f3f223a23016ff142724e88e481;p=pti%2Fo2.git Watch Elastic Metricbeat for Performance Measurement Jobs This commit introduces a client implementation to interact with the Elasticsearch API, enabling the querying of event types for performance measurement jobs. Test Plan: PASS - Verify that the aggregation datasets for performance measurement jobs are collected as expected. PASS - Test the performance measurement jobs API to ensure it returns the correct results. PASS - Confirm that if Elasticsearch is not available, the watcher skips the performance measurement (PM) portion as intended. PASS - Ensure that existing performance measurement jobs are skipped and not duplicated. Change-Id: I399c5c14b585ff9e05d7aaea582bd824733e8da3 Signed-off-by: Zhang Rong(Jon) --- diff --git a/configs/o2app.conf b/configs/o2app.conf index ad1e5da..7730a23 100644 --- a/configs/o2app.conf +++ b/configs/o2app.conf @@ -41,3 +41,10 @@ DMS_SUPPORT_PROFILES = [WATCHER] [PUBSUB] + +[PM] +# Elasticsearch connection settings +ES_USERNAME = sysadmin +ES_PASSWORD = sysadmin +ES_PORT = 31001 +ES_PATH = /mon-elasticsearch-client diff --git a/o2app/entrypoints/resource_watcher.py b/o2app/entrypoints/resource_watcher.py index 085fb13..11b833b 100644 --- a/o2app/entrypoints/resource_watcher.py +++ b/o2app/entrypoints/resource_watcher.py @@ -60,6 +60,9 @@ from o2ims.service.watcher.agg_undefined_watcher import UndefinedAggWatcher from o2ims.adapter.clients.aggregate_client import ComputeAggClient, \ NetworkAggClient, StorageAggClient, UndefinedAggClient +from o2ims.adapter.clients.pm_client import MeasurementJobClient +from o2ims.service.watcher.measurement_watcher import MeasurementWatcher + from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -80,9 +83,7 @@ class WatcherService(cotyledon.Service): root = WatcherTree(OcloudWatcher( StxOcloudClient(), self.bus)) root.addchild( - DmsWatcher(StxDmsClient(), self.bus)) - # root.addchild( - # AlarmWatcher(StxFaultClient(), self.bus)) + DmsWatcher(StxDmsClient(), self.bus)) child_respool = root.addchild( ResourcePoolWatcher(StxResourcePoolClient(), @@ -120,6 +121,11 @@ class WatcherService(cotyledon.Service): child_respool.addchild( AlarmWatcher(StxAlarmClient(self.bus.uow), self.bus)) + # Add Measurement watch + child_respool.addchild( + MeasurementWatcher(MeasurementJobClient(self.bus.uow), + self.bus)) + self.worker.add_watcher(root) self.worker.start() diff --git a/o2app/service/handlers.py b/o2app/service/handlers.py index 6dc771a..63bf54c 100644 --- a/o2app/service/handlers.py +++ b/o2app/service/handlers.py @@ -28,7 +28,7 @@ from o2ims.service.auditor import ocloud_handler, dms_handler, \ pserver_mem_handler, pserver_port_handler, pserver_if_handler,\ pserver_eth_handler, pserver_acc_handler, alarm_handler, \ pserver_dev_handler, agg_compute_handler, agg_network_handler,\ - agg_storage_handler, agg_undefined_handler + agg_storage_handler, agg_undefined_handler, measurement_handler from o2ims.service.command import notify_handler, registration_handler,\ notify_alarm_handler, clear_alarm_handler, purge_alarm_handler from o2ims.service.event import ocloud_event, resource_event, \ @@ -101,4 +101,5 @@ COMMAND_HANDLERS = { commands.Register2SMO: registration_handler.registry_to_smo, commands.ClearAlarmEvent: clear_alarm_handler.clear_alarm_event, commands.PurgeAlarmEvent: purge_alarm_handler.purge_alarm_event, + commands.UpdateMeasurement: measurement_handler.update_measurement, } # type: Dict[Type[commands.Command], Callable] diff --git a/o2common/config/config.py b/o2common/config/config.py index ab64181..98b0ad9 100644 --- a/o2common/config/config.py +++ b/o2common/config/config.py @@ -430,3 +430,38 @@ def get_min_retention_period(): logger.warning(f"Invalid min_retention_period value: {e}") return _DEFAULT_MIN_RETENTION_PERIOD + + +def get_es_access_info(ip=None): + """Get Elasticsearch access information. + + Args: + ip (str, optional): IP address of the Elasticsearch server. + Defaults to None and will use environment variable. + + Returns: + dict: Dictionary containing Elasticsearch connection details + """ + # Get values from config file + username = config.conf.PM.ES_USERNAME + password = config.conf.PM.ES_PASSWORD + port = config.conf.PM.ES_PORT + path = config.conf.PM.ES_PATH + + # Allow environment variables to override config file + username = os.getenv('ES_USERNAME', username) + password = os.getenv('ES_PASSWORD', password) + port = os.getenv('ES_PORT', port) + path = os.getenv('ES_PATH', path) + + # Use provided IP or fallback to environment variable + ip = ip or os.getenv('ES_IP', None) + + # Construct the URL + url = f'https://{ip}:{port}{path}' + + return { + 'url': url, + 'username': username, + 'password': password, + } diff --git a/o2ims/adapter/clients/pm_client.py b/o2ims/adapter/clients/pm_client.py new file mode 100644 index 0000000..2c7da31 --- /dev/null +++ b/o2ims/adapter/clients/pm_client.py @@ -0,0 +1,228 @@ +# Copyright (C) 2025 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List +from elasticsearch import Elasticsearch +from cgtsclient.client import get_client as get_stx_client +from o2common.config import config +from o2common.service.client.base_client import BaseClient +from o2ims.domain import performance_obj as pm_obj + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class MeasurementJobClient(BaseClient): + def __init__(self, uow, driver=None): + super().__init__() + self.driver = driver if driver else EsPmClientImp() + self.uow = uow + + def _get(self, id): + return self.driver.getMeasurementJobInfo(id) + + def _list(self, **filters) -> List[pm_obj.MeasurementJob]: + return self.driver.getMeasurementJobList(**filters) + + def _set_stx_client(self): + pass + + +class EsPmClientImp(object): + def __init__(self, es_client=None, stx_client=None): + super().__init__() + self.stxclient = stx_client if stx_client else self.getStxClient() + self.es_client = es_client if es_client else self.getEsClient() + + def getStxClient(self): + os_client_args = config.get_stx_access_info() + config_client = get_stx_client(**os_client_args) + return config_client + + def getEsClient(self): + # Get OAM floating IP from iextoam + try: + iextoams = self.stxclient.iextoam.list() + if iextoams and hasattr(iextoams[0], 'oam_floating_ip'): + oam_ip = iextoams[0].oam_floating_ip + logger.debug(f"Using OAM floating IP for ES connection: " + f"{oam_ip}") + else: + logger.warning("No OAM floating IP found, using default") + return None + except Exception as e: + logger.warning(f"Failed to get OAM floating IP: {str(e)}, " + f"using default") + return None + + # Get ES connection info using OAM floating IP + es_config = config.get_es_access_info(ip=oam_ip) + + es_client = Elasticsearch( + es_config['url'], + basic_auth=(es_config['username'], es_config['password']), + verify_certs=False + ) + if not es_client.ping(): + logger.error("Failed to connect to Elasticsearch") + return None + return es_client + + def getMeasurementJobList(self, **filters) -> List[pm_obj.MeasurementJob]: + # Check ES client connection first + try: + if not self.es_client or not self.es_client.ping(): + logger.warning("Elasticsearch client is not available") + return [] + except Exception as e: + logger.error(f"Failed to ping Elasticsearch: {str(e)}") + return [] + + query = self._build_query(**filters) + try: + response = self.es_client.search( + body=query + ) + return [self._convert_to_measurement_job(bucket) + for bucket in + response['aggregations']['unique_datasets']['buckets']] + except Exception as e: + logger.error(f"Error querying Elasticsearch: {str(e)}") + return [] + + def getMeasurementJobInfo(self, id) -> pm_obj.MeasurementJob: + """Get detailed information for a specific measurement job.""" + query = { + "size": 1, + "query": { + "bool": { + "must": [ + {"term": {"event.dataset": id}} + ] + } + } + } + + try: + response = self.es_client.search( + body=query + ) + if response['hits']['hits']: + return self._convert_to_measurement_job_detail( + response['hits']['hits'][0], id) + else: + logger.warning(f"No data found for measurement job {id}") + return None + except Exception as e: + logger.error(f"Error getting measurement job info for {id}: " + f"{str(e)}") + raise + + @staticmethod + def _build_query(**filters): + must_conditions = [ + { + "term": { + "type": "beats" + } + }, + { + "term": { + "service.type": "system" + } + }, + { + "range": { + "@timestamp": { + "gte": "now-1h" + } + } + } + ] + + # Add host filter if provided + if 'host_name' in filters: + must_conditions.append({ + "term": { + "host.name": filters['host_name'] + } + }) + + query = { + "size": 0, + "query": { + "bool": { + "must": must_conditions + } + }, + "aggs": { + "unique_datasets": { + "terms": { + "field": "event.dataset", + "size": filters.get('size', 100) + } + } + } + } + + return query + + @staticmethod + def _convert_to_measurement_job(bucket) -> pm_obj.MeasurementJob: + """Convert ES aggregation bucket to MeasurementJob.""" + job_id = bucket['key'] # e.g., "system.cpu" + doc_count = bucket['doc_count'] + + return pm_obj.MeasurementJob( + job_id=job_id, + consumer_job_id=f"consumer_{job_id}", + state=pm_obj.MeasurementJobState.ACTIVE, + status=pm_obj.MeasurementJobStatus.RUNNING, + collection_interval=60, # Default collection interval in seconds + measurement_criteria=[{ + "measurementType": job_id, + "measurementName": job_id, + "sampleCount": doc_count + }], + preinstalled_job=True, + resource_criteria={ + "resourceType": "pserver" + } + ) + + @staticmethod + def _convert_to_measurement_job_detail(hit, job_id) -> \ + pm_obj.MeasurementJob: + """Convert ES document to detailed MeasurementJob.""" + source = hit['_source'] + return pm_obj.MeasurementJob( + job_id=job_id, + consumer_job_id=f"consumer_{job_id}", + state=pm_obj.MeasurementJobState.ACTIVE, + status=pm_obj.MeasurementJobStatus.RUNNING, + collection_interval=60, + measurement_criteria=[ + { + "measurementType": job_id, + "measurementName": source.get('event', {}).get('dataset'), + "performanceMetric": source.get('metricset', {}). + get('name', '') + } + ], + resource_criteria={ + "resourceType": "pserver", + "resourceName": source.get('host', {}).get('name') + }, + preinstalled_job=True + ) diff --git a/o2ims/adapter/orm.py b/o2ims/adapter/orm.py index 0b51590..d8362ef 100644 --- a/o2ims/adapter/orm.py +++ b/o2ims/adapter/orm.py @@ -283,8 +283,7 @@ measured_resource = Table( Column("hash", String(255)), Column("version_number", Integer), - Column("id", String(255), primary_key=True), - Column("resourceId", String(255)), + Column("resourceId", String(255), primary_key=True), Column("resourceTypeId", String(255)), Column("measurementJobId", ForeignKey( "measurementJob.performanceMeasurementJobId")) @@ -298,8 +297,7 @@ collected_measurement = Table( Column("hash", String(255)), Column("version_number", Integer), - Column("id", String(255), primary_key=True), - Column("measurementId", String(255)), + Column("measurementId", String(255), primary_key=True), Column("measurementJobId", ForeignKey( "measurementJob.performanceMeasurementJobId")) ) diff --git a/o2ims/domain/commands.py b/o2ims/domain/commands.py index f335f5f..7df1c16 100644 --- a/o2ims/domain/commands.py +++ b/o2ims/domain/commands.py @@ -19,6 +19,7 @@ from dataclasses import dataclass from o2ims.domain.stx_object import StxGenericModel from o2ims.domain.alarm_obj import AlarmEvent2SMO, FaultGenericModel +from o2ims.domain.performance_obj import MeasurementJob from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage # from o2ims.domain.resource_type import ResourceTypeEnum from o2common.domain.commands import Command @@ -34,6 +35,11 @@ class UpdateFaultObject(Command): data: FaultGenericModel +@dataclass +class UpdatePmObject(Command): + pass + + @dataclass class PubMessage2SMO(Command): data: Message2SMO @@ -148,3 +154,9 @@ class ClearAlarmEvent(UpdateFaultObject): @dataclass class PurgeAlarmEvent(UpdateFaultObject): data: AlarmEvent2SMO + + +@dataclass +class UpdateMeasurement(UpdatePmObject): + measurement: MeasurementJob + parentid: str diff --git a/o2ims/domain/performance_obj.py b/o2ims/domain/performance_obj.py index f5a1170..af1c8a1 100644 --- a/o2ims/domain/performance_obj.py +++ b/o2ims/domain/performance_obj.py @@ -15,6 +15,7 @@ from __future__ import annotations from enum import Enum from typing import List, Dict +import json from o2common.domain.base import AgRoot, Serializer @@ -78,11 +79,50 @@ class MeasurementJob(AgRoot, Serializer): self.consumerPerformanceJobId = consumer_job_id self.state = state self.collectionInterval = collection_interval - self.resourceScopeCriteria = resource_criteria or {} - self.measurementSelectionCriteria = measurement_criteria + self._resourceScopeCriteria = json.dumps(resource_criteria) if resource_criteria else '{}' # noqa: E501 + self._measurementSelectionCriteria = json.dumps(measurement_criteria) if measurement_criteria else '[]' # noqa: E501 self.status = status self.preinstalledJob = preinstalled_job - self.qualifiedResourceTypes: List[str] = [] + self._qualifiedResourceTypes = '[]' self.measuredResources: List[MeasuredResource] = [] self.collectedMeasurements: List[CollectedMeasurement] = [] self.extensions = '' + + @property + def resourceScopeCriteria(self) -> Dict: + return json.loads(self._resourceScopeCriteria) + + @resourceScopeCriteria.setter + def resourceScopeCriteria(self, value: Dict): + self._resourceScopeCriteria = json.dumps(value) if value else '{}' + + @property + def measurementSelectionCriteria(self) -> List: + return json.loads(self._measurementSelectionCriteria) + + @measurementSelectionCriteria.setter + def measurementSelectionCriteria(self, value: List): + self._measurementSelectionCriteria = \ + json.dumps(value) if value else '[]' + + @property + def qualifiedResourceTypes(self) -> List: + return json.loads(self._qualifiedResourceTypes) + + @qualifiedResourceTypes.setter + def qualifiedResourceTypes(self, value: List): + self._qualifiedResourceTypes = json.dumps(value) if value else '[]' + + def __composite_values__(self): + return [ + self.performanceMeasurementJobId, + self.consumerPerformanceJobId, + self.state, + self.collectionInterval, + self._resourceScopeCriteria, + self._measurementSelectionCriteria, + self.status, + self.preinstalledJob, + self._qualifiedResourceTypes, + self.extensions + ] diff --git a/o2ims/service/auditor/measurement_handler.py b/o2ims/service/auditor/measurement_handler.py new file mode 100644 index 0000000..30b083d --- /dev/null +++ b/o2ims/service/auditor/measurement_handler.py @@ -0,0 +1,120 @@ +# Copyright (C) 2025 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=unused-argument +from __future__ import annotations +import json + +from o2common.service.unit_of_work import AbstractUnitOfWork +from o2ims.domain import commands, ocloud +from o2ims.domain.performance_obj import ( + MeasurementJob, MeasurementJobState, MeasurementJobStatus, + MeasuredResource, CollectedMeasurement +) + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +def update_measurement( + cmd: commands.UpdateMeasurement, + uow: AbstractUnitOfWork +): + job = cmd.measurement + logger.info(f"Processing measurement job: " + f"{job.performanceMeasurementJobId}") + + with uow: + resourcepool = uow.resource_pools.get(cmd.parentid) + + # Check if measurement job already exists + measurement_job = uow.measurement_jobs.get( + job.performanceMeasurementJobId) + if not measurement_job: + # Create new measurement job + measurement_job = create_measurement_job(job) + # Link to resources + restype = uow.resource_types.get_by_name('pserver') + if restype: + measurement_job.qualifiedResourceTypes.append( + restype.resourceTypeId) + args = [ocloud.Resource.resourceTypeId == + restype.resourceTypeId] + hosts = uow.resources.list(resourcepool.resourcePoolId, *args) + + for host in hosts: + # Add measured resource + measured_resource = MeasuredResource( + host.resourceId, + restype.resourceTypeId, + is_currently_measured=True + ) + measurement_job.measuredResources.append(measured_resource) + + # Add collected measurement + collected_measurement = CollectedMeasurement( + f"{job.performanceMeasurementJobId}_{host.resourceId}", + restype.resourceTypeId, + job.performanceMeasurementJobId, + [], # timeAdded + is_currently_measured=True + ) + measurement_job.collectedMeasurements.append( + collected_measurement) + + uow.measurement_jobs.add(measurement_job) + logger.info(f"Added measurement job: " + f"{job.performanceMeasurementJobId}") + + else: + # Update existing job + if is_job_changed(measurement_job, job): + logger.info(f"Updating measurement job: " + f"{job.performanceMeasurementJobId}") + update_measurement_job(measurement_job, job) + uow.measurement_jobs.update(measurement_job) + + uow.commit() + + +def is_job_changed(current_job: MeasurementJob, + new_job: MeasurementJob) -> bool: + """Check if measurement job needs updating.""" + return (current_job.state != new_job.state or + current_job.status != new_job.status or + current_job.collectionInterval != new_job.collectionInterval) + + +def create_measurement_job(job: MeasurementJob) -> MeasurementJob: + """Create new measurement job.""" + return MeasurementJob( + job_id=job.performanceMeasurementJobId, + consumer_job_id=job.consumerPerformanceJobId, + state=job.state or MeasurementJobState.ACTIVE, + collection_interval=job.collectionInterval, + measurement_criteria=job.measurementSelectionCriteria, + status=job.status or MeasurementJobStatus.RUNNING, + preinstalled_job=job.preinstalledJob, + resource_criteria=job.resourceScopeCriteria + ) + + +def update_measurement_job(target: MeasurementJob, + source: MeasurementJob) -> None: + """Update existing measurement job with new data.""" + target.state = source.state + target.status = source.status + target.collectionInterval = source.collectionInterval + target.measurementSelectionCriteria = source.measurementSelectionCriteria + target.resourceScopeCriteria = source.resourceScopeCriteria diff --git a/o2ims/service/watcher/measurement_watcher.py b/o2ims/service/watcher/measurement_watcher.py new file mode 100644 index 0000000..0482cb8 --- /dev/null +++ b/o2ims/service/watcher/measurement_watcher.py @@ -0,0 +1,52 @@ +# Copyright (C) 2025 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from o2common.domain import tags +from o2common.service.messagebus import MessageBus +from o2common.service.watcher.base import BaseWatcher +from o2common.service.client.base_client import BaseClient + +from o2ims.domain import commands +from o2ims.domain.stx_object import StxGenericModel + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class MeasurementWatcher(BaseWatcher): + def __init__(self, pm_client: BaseClient, + bus: MessageBus) -> None: + super().__init__(pm_client, bus) + self._tags = tags.Tag() + self.poolid = None + + def _targetname(self): + return "measurement" + + def _probe(self, parent: StxGenericModel, tags: object = None): + # Set a tag for children resource + self._tags.pool = parent.res_pool_id + self._set_respool_client() + + # Get new measurements + newmodels = self._client.list( + size=100, # Configurable batch size + event_module='system' # Filter for system metrics + ) + + return [commands.UpdateMeasurement(m, self.poolid) + for m in newmodels] if len(newmodels) > 0 else [] + + def _set_respool_client(self): + self.poolid = self._tags.pool diff --git a/requirements-stx.txt b/requirements-stx.txt index 652fe5b..1fc4ab6 100644 --- a/requirements-stx.txt +++ b/requirements-stx.txt @@ -5,4 +5,6 @@ # Updated to lastest commit at stx 9.0 -e git+https://opendev.org/starlingx/distcloud-client.git@r/stx.9.0#egg=distributedcloud-client&subdirectory=distributedcloud-client -e git+https://opendev.org/starlingx/config.git@r/stx.9.0#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client --e git+https://opendev.org/starlingx/fault.git@r/stx.9.0#egg=fmclient&subdirectory=python-fmclient/fmclient \ No newline at end of file +-e git+https://opendev.org/starlingx/fault.git@r/stx.9.0#egg=fmclient&subdirectory=python-fmclient/fmclient + +elasticsearch \ No newline at end of file diff --git a/requirements-test.txt b/requirements-test.txt index 09238cd..76ddf99 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -15,3 +15,5 @@ pyOpenSSL # -e git+https://opendev.org/starlingx/distcloud-client.git@master#egg=distributedcloud-client&subdirectory=distributedcloud-client # -e git+https://opendev.org/starlingx/config.git@master#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client + +elasticsearch diff --git a/tests/integration-ocloud/test_clientdriver_pm.py b/tests/integration-ocloud/test_clientdriver_pm.py new file mode 100644 index 0000000..cbfd2ee --- /dev/null +++ b/tests/integration-ocloud/test_clientdriver_pm.py @@ -0,0 +1,135 @@ +# Copyright (C) 2025 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from elasticsearch import Elasticsearch + +from o2common.config import config +from o2ims.adapter.clients.pm_client import StxPmClientImp +from o2ims.domain import performance_obj as pm_obj + + +@pytest.fixture +def real_es_client(): + es_config = config.get_es_access_info() + es_client = Elasticsearch( + es_config['url'], + basic_auth=(es_config['username'], es_config['password']), + verify_certs=False + ) + yield es_client + + +@pytest.fixture +def real_pm_client(real_es_client): + pm_client = StxPmClientImp(real_es_client) + yield pm_client + + +def test_get_es_client(): + pm_client = StxPmClientImp() + assert pm_client.es_client is not None + assert pm_client.es_client.ping() + + +def test_get_pm_list(real_pm_client): + # Test getting PM list without filters + pm_list = real_pm_client.getPmList() + assert pm_list is not None + if len(pm_list) > 0: + assert isinstance(pm_list[0], pm_obj.PmGenericModel) + assert pm_list[0].id is not None + assert pm_list[0].host_name is not None + + +def test_get_pm_list_with_host_filter(real_pm_client): + # Test getting PM list with host filter + pm_list = real_pm_client.getPmList(host_name="controller-0") + assert pm_list is not None + if len(pm_list) > 0: + assert isinstance(pm_list[0], pm_obj.PmGenericModel) + assert pm_list[0].host_name == "controller-0" + + +def test_get_pm_list_with_size_filter(real_pm_client): + # Test getting PM list with size filter + size = 5 + pm_list = real_pm_client.getPmList(size=size) + assert pm_list is not None + assert len(pm_list) <= size + + +def test_get_pm_info(real_pm_client): + # First get a list to get a valid ID + pm_list = real_pm_client.getPmList() + if len(pm_list) > 0: + pm_id = pm_list[0].id + pm_info = real_pm_client.getPmInfo(pm_id) + assert pm_info is not None + assert isinstance(pm_info, pm_obj.PmGenericModel) + assert pm_info.id == pm_id + + +def test_build_query(): + # Test query building with different filter combinations + filters = { + 'host_name': 'controller-0', + 'size': 5 + } + query = StxPmClientImp._build_query(**filters) + + # Verify query structure + assert query['size'] == 0 + assert 'query' in query + assert 'bool' in query['query'] + assert 'must' in query['query']['bool'] + + # Verify must conditions + must_conditions = query['query']['bool']['must'] + assert {'term': {'type': 'beats'}} in must_conditions + assert {'term': {'service.type': 'system'}} in must_conditions + assert {'term': {'host.name': 'controller-0'}} in must_conditions + + # Verify aggregations + assert 'aggs' in query + assert 'unique_datasets' in query['aggs'] + assert query['aggs']['unique_datasets']['terms']['size'] == 5 + + +def test_convert_to_pm_model(): + # Test PM model conversion + es_hit = { + '_id': 'test_id', + '_source': { + 'host': {'name': 'test_host'}, + 'event': {'module': 'test_module'}, + 'raw_data': {'test': 'data'} + } + } + + pm_model = StxPmClientImp._convert_to_pm_model(es_hit) + assert isinstance(pm_model, pm_obj.PmGenericModel) + assert pm_model.id == 'test_id' + assert pm_model.host_name == 'test_host' + assert pm_model.event_module == 'test_module' + assert pm_model.raw_data == es_hit['_source'] + + +def test_set_pm_client(real_pm_client): + # Test setting PM client for different resource pools + resource_pool_id = 'test_pool' + real_pm_client.setPmClient(resource_pool_id) + # Since setPmClient is a placeholder, just verify + # it doesn't raise an exception + assert True