[WATCHER]
[PUBSUB]
+
+[PM]
+# Elasticsearch connection settings
+ES_USERNAME = sysadmin
+ES_PASSWORD = sysadmin
+ES_PORT = 31001
+ES_PATH = /mon-elasticsearch-client
from o2ims.adapter.clients.aggregate_client import ComputeAggClient, \
NetworkAggClient, StorageAggClient, UndefinedAggClient
+from o2ims.adapter.clients.pm_client import MeasurementJobClient
+from o2ims.service.watcher.measurement_watcher import MeasurementWatcher
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
root = WatcherTree(OcloudWatcher(
StxOcloudClient(), self.bus))
root.addchild(
- DmsWatcher(StxDmsClient(), self.bus))
- # root.addchild(
- # AlarmWatcher(StxFaultClient(), self.bus))
+ DmsWatcher(StxDmsClient(), self.bus))
child_respool = root.addchild(
ResourcePoolWatcher(StxResourcePoolClient(),
child_respool.addchild(
AlarmWatcher(StxAlarmClient(self.bus.uow), self.bus))
+ # Add Measurement watch
+ child_respool.addchild(
+ MeasurementWatcher(MeasurementJobClient(self.bus.uow),
+ self.bus))
+
self.worker.add_watcher(root)
self.worker.start()
pserver_mem_handler, pserver_port_handler, pserver_if_handler,\
pserver_eth_handler, pserver_acc_handler, alarm_handler, \
pserver_dev_handler, agg_compute_handler, agg_network_handler,\
- agg_storage_handler, agg_undefined_handler
+ agg_storage_handler, agg_undefined_handler, measurement_handler
from o2ims.service.command import notify_handler, registration_handler,\
notify_alarm_handler, clear_alarm_handler, purge_alarm_handler
from o2ims.service.event import ocloud_event, resource_event, \
commands.Register2SMO: registration_handler.registry_to_smo,
commands.ClearAlarmEvent: clear_alarm_handler.clear_alarm_event,
commands.PurgeAlarmEvent: purge_alarm_handler.purge_alarm_event,
+ commands.UpdateMeasurement: measurement_handler.update_measurement,
} # type: Dict[Type[commands.Command], Callable]
logger.warning(f"Invalid min_retention_period value: {e}")
return _DEFAULT_MIN_RETENTION_PERIOD
+
+
+def get_es_access_info(ip=None):
+ """Get Elasticsearch access information.
+
+ Args:
+ ip (str, optional): IP address of the Elasticsearch server.
+ Defaults to None and will use environment variable.
+
+ Returns:
+ dict: Dictionary containing Elasticsearch connection details
+ """
+ # Get values from config file
+ username = config.conf.PM.ES_USERNAME
+ password = config.conf.PM.ES_PASSWORD
+ port = config.conf.PM.ES_PORT
+ path = config.conf.PM.ES_PATH
+
+ # Allow environment variables to override config file
+ username = os.getenv('ES_USERNAME', username)
+ password = os.getenv('ES_PASSWORD', password)
+ port = os.getenv('ES_PORT', port)
+ path = os.getenv('ES_PATH', path)
+
+ # Use provided IP or fallback to environment variable
+ ip = ip or os.getenv('ES_IP', None)
+
+ # Construct the URL
+ url = f'https://{ip}:{port}{path}'
+
+ return {
+ 'url': url,
+ 'username': username,
+ 'password': password,
+ }
--- /dev/null
+# Copyright (C) 2025 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List
+from elasticsearch import Elasticsearch
+from cgtsclient.client import get_client as get_stx_client
+from o2common.config import config
+from o2common.service.client.base_client import BaseClient
+from o2ims.domain import performance_obj as pm_obj
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class MeasurementJobClient(BaseClient):
+ def __init__(self, uow, driver=None):
+ super().__init__()
+ self.driver = driver if driver else EsPmClientImp()
+ self.uow = uow
+
+ def _get(self, id):
+ return self.driver.getMeasurementJobInfo(id)
+
+ def _list(self, **filters) -> List[pm_obj.MeasurementJob]:
+ return self.driver.getMeasurementJobList(**filters)
+
+ def _set_stx_client(self):
+ pass
+
+
+class EsPmClientImp(object):
+ def __init__(self, es_client=None, stx_client=None):
+ super().__init__()
+ self.stxclient = stx_client if stx_client else self.getStxClient()
+ self.es_client = es_client if es_client else self.getEsClient()
+
+ def getStxClient(self):
+ os_client_args = config.get_stx_access_info()
+ config_client = get_stx_client(**os_client_args)
+ return config_client
+
+ def getEsClient(self):
+ # Get OAM floating IP from iextoam
+ try:
+ iextoams = self.stxclient.iextoam.list()
+ if iextoams and hasattr(iextoams[0], 'oam_floating_ip'):
+ oam_ip = iextoams[0].oam_floating_ip
+ logger.debug(f"Using OAM floating IP for ES connection: "
+ f"{oam_ip}")
+ else:
+ logger.warning("No OAM floating IP found, using default")
+ return None
+ except Exception as e:
+ logger.warning(f"Failed to get OAM floating IP: {str(e)}, "
+ f"using default")
+ return None
+
+ # Get ES connection info using OAM floating IP
+ es_config = config.get_es_access_info(ip=oam_ip)
+
+ es_client = Elasticsearch(
+ es_config['url'],
+ basic_auth=(es_config['username'], es_config['password']),
+ verify_certs=False
+ )
+ if not es_client.ping():
+ logger.error("Failed to connect to Elasticsearch")
+ return None
+ return es_client
+
+ def getMeasurementJobList(self, **filters) -> List[pm_obj.MeasurementJob]:
+ # Check ES client connection first
+ try:
+ if not self.es_client or not self.es_client.ping():
+ logger.warning("Elasticsearch client is not available")
+ return []
+ except Exception as e:
+ logger.error(f"Failed to ping Elasticsearch: {str(e)}")
+ return []
+
+ query = self._build_query(**filters)
+ try:
+ response = self.es_client.search(
+ body=query
+ )
+ return [self._convert_to_measurement_job(bucket)
+ for bucket in
+ response['aggregations']['unique_datasets']['buckets']]
+ except Exception as e:
+ logger.error(f"Error querying Elasticsearch: {str(e)}")
+ return []
+
+ def getMeasurementJobInfo(self, id) -> pm_obj.MeasurementJob:
+ """Get detailed information for a specific measurement job."""
+ query = {
+ "size": 1,
+ "query": {
+ "bool": {
+ "must": [
+ {"term": {"event.dataset": id}}
+ ]
+ }
+ }
+ }
+
+ try:
+ response = self.es_client.search(
+ body=query
+ )
+ if response['hits']['hits']:
+ return self._convert_to_measurement_job_detail(
+ response['hits']['hits'][0], id)
+ else:
+ logger.warning(f"No data found for measurement job {id}")
+ return None
+ except Exception as e:
+ logger.error(f"Error getting measurement job info for {id}: "
+ f"{str(e)}")
+ raise
+
+ @staticmethod
+ def _build_query(**filters):
+ must_conditions = [
+ {
+ "term": {
+ "type": "beats"
+ }
+ },
+ {
+ "term": {
+ "service.type": "system"
+ }
+ },
+ {
+ "range": {
+ "@timestamp": {
+ "gte": "now-1h"
+ }
+ }
+ }
+ ]
+
+ # Add host filter if provided
+ if 'host_name' in filters:
+ must_conditions.append({
+ "term": {
+ "host.name": filters['host_name']
+ }
+ })
+
+ query = {
+ "size": 0,
+ "query": {
+ "bool": {
+ "must": must_conditions
+ }
+ },
+ "aggs": {
+ "unique_datasets": {
+ "terms": {
+ "field": "event.dataset",
+ "size": filters.get('size', 100)
+ }
+ }
+ }
+ }
+
+ return query
+
+ @staticmethod
+ def _convert_to_measurement_job(bucket) -> pm_obj.MeasurementJob:
+ """Convert ES aggregation bucket to MeasurementJob."""
+ job_id = bucket['key'] # e.g., "system.cpu"
+ doc_count = bucket['doc_count']
+
+ return pm_obj.MeasurementJob(
+ job_id=job_id,
+ consumer_job_id=f"consumer_{job_id}",
+ state=pm_obj.MeasurementJobState.ACTIVE,
+ status=pm_obj.MeasurementJobStatus.RUNNING,
+ collection_interval=60, # Default collection interval in seconds
+ measurement_criteria=[{
+ "measurementType": job_id,
+ "measurementName": job_id,
+ "sampleCount": doc_count
+ }],
+ preinstalled_job=True,
+ resource_criteria={
+ "resourceType": "pserver"
+ }
+ )
+
+ @staticmethod
+ def _convert_to_measurement_job_detail(hit, job_id) -> \
+ pm_obj.MeasurementJob:
+ """Convert ES document to detailed MeasurementJob."""
+ source = hit['_source']
+ return pm_obj.MeasurementJob(
+ job_id=job_id,
+ consumer_job_id=f"consumer_{job_id}",
+ state=pm_obj.MeasurementJobState.ACTIVE,
+ status=pm_obj.MeasurementJobStatus.RUNNING,
+ collection_interval=60,
+ measurement_criteria=[
+ {
+ "measurementType": job_id,
+ "measurementName": source.get('event', {}).get('dataset'),
+ "performanceMetric": source.get('metricset', {}).
+ get('name', '')
+ }
+ ],
+ resource_criteria={
+ "resourceType": "pserver",
+ "resourceName": source.get('host', {}).get('name')
+ },
+ preinstalled_job=True
+ )
Column("hash", String(255)),
Column("version_number", Integer),
- Column("id", String(255), primary_key=True),
- Column("resourceId", String(255)),
+ Column("resourceId", String(255), primary_key=True),
Column("resourceTypeId", String(255)),
Column("measurementJobId", ForeignKey(
"measurementJob.performanceMeasurementJobId"))
Column("hash", String(255)),
Column("version_number", Integer),
- Column("id", String(255), primary_key=True),
- Column("measurementId", String(255)),
+ Column("measurementId", String(255), primary_key=True),
Column("measurementJobId", ForeignKey(
"measurementJob.performanceMeasurementJobId"))
)
from o2ims.domain.stx_object import StxGenericModel
from o2ims.domain.alarm_obj import AlarmEvent2SMO, FaultGenericModel
+from o2ims.domain.performance_obj import MeasurementJob
from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
# from o2ims.domain.resource_type import ResourceTypeEnum
from o2common.domain.commands import Command
data: FaultGenericModel
+@dataclass
+class UpdatePmObject(Command):
+ pass
+
+
@dataclass
class PubMessage2SMO(Command):
data: Message2SMO
@dataclass
class PurgeAlarmEvent(UpdateFaultObject):
data: AlarmEvent2SMO
+
+
+@dataclass
+class UpdateMeasurement(UpdatePmObject):
+ measurement: MeasurementJob
+ parentid: str
from __future__ import annotations
from enum import Enum
from typing import List, Dict
+import json
from o2common.domain.base import AgRoot, Serializer
self.consumerPerformanceJobId = consumer_job_id
self.state = state
self.collectionInterval = collection_interval
- self.resourceScopeCriteria = resource_criteria or {}
- self.measurementSelectionCriteria = measurement_criteria
+ self._resourceScopeCriteria = json.dumps(resource_criteria) if resource_criteria else '{}' # noqa: E501
+ self._measurementSelectionCriteria = json.dumps(measurement_criteria) if measurement_criteria else '[]' # noqa: E501
self.status = status
self.preinstalledJob = preinstalled_job
- self.qualifiedResourceTypes: List[str] = []
+ self._qualifiedResourceTypes = '[]'
self.measuredResources: List[MeasuredResource] = []
self.collectedMeasurements: List[CollectedMeasurement] = []
self.extensions = ''
+
+ @property
+ def resourceScopeCriteria(self) -> Dict:
+ return json.loads(self._resourceScopeCriteria)
+
+ @resourceScopeCriteria.setter
+ def resourceScopeCriteria(self, value: Dict):
+ self._resourceScopeCriteria = json.dumps(value) if value else '{}'
+
+ @property
+ def measurementSelectionCriteria(self) -> List:
+ return json.loads(self._measurementSelectionCriteria)
+
+ @measurementSelectionCriteria.setter
+ def measurementSelectionCriteria(self, value: List):
+ self._measurementSelectionCriteria = \
+ json.dumps(value) if value else '[]'
+
+ @property
+ def qualifiedResourceTypes(self) -> List:
+ return json.loads(self._qualifiedResourceTypes)
+
+ @qualifiedResourceTypes.setter
+ def qualifiedResourceTypes(self, value: List):
+ self._qualifiedResourceTypes = json.dumps(value) if value else '[]'
+
+ def __composite_values__(self):
+ return [
+ self.performanceMeasurementJobId,
+ self.consumerPerformanceJobId,
+ self.state,
+ self.collectionInterval,
+ self._resourceScopeCriteria,
+ self._measurementSelectionCriteria,
+ self.status,
+ self.preinstalledJob,
+ self._qualifiedResourceTypes,
+ self.extensions
+ ]
--- /dev/null
+# Copyright (C) 2025 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+import json
+
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain import commands, ocloud
+from o2ims.domain.performance_obj import (
+ MeasurementJob, MeasurementJobState, MeasurementJobStatus,
+ MeasuredResource, CollectedMeasurement
+)
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+def update_measurement(
+ cmd: commands.UpdateMeasurement,
+ uow: AbstractUnitOfWork
+):
+ job = cmd.measurement
+ logger.info(f"Processing measurement job: "
+ f"{job.performanceMeasurementJobId}")
+
+ with uow:
+ resourcepool = uow.resource_pools.get(cmd.parentid)
+
+ # Check if measurement job already exists
+ measurement_job = uow.measurement_jobs.get(
+ job.performanceMeasurementJobId)
+ if not measurement_job:
+ # Create new measurement job
+ measurement_job = create_measurement_job(job)
+ # Link to resources
+ restype = uow.resource_types.get_by_name('pserver')
+ if restype:
+ measurement_job.qualifiedResourceTypes.append(
+ restype.resourceTypeId)
+ args = [ocloud.Resource.resourceTypeId ==
+ restype.resourceTypeId]
+ hosts = uow.resources.list(resourcepool.resourcePoolId, *args)
+
+ for host in hosts:
+ # Add measured resource
+ measured_resource = MeasuredResource(
+ host.resourceId,
+ restype.resourceTypeId,
+ is_currently_measured=True
+ )
+ measurement_job.measuredResources.append(measured_resource)
+
+ # Add collected measurement
+ collected_measurement = CollectedMeasurement(
+ f"{job.performanceMeasurementJobId}_{host.resourceId}",
+ restype.resourceTypeId,
+ job.performanceMeasurementJobId,
+ [], # timeAdded
+ is_currently_measured=True
+ )
+ measurement_job.collectedMeasurements.append(
+ collected_measurement)
+
+ uow.measurement_jobs.add(measurement_job)
+ logger.info(f"Added measurement job: "
+ f"{job.performanceMeasurementJobId}")
+
+ else:
+ # Update existing job
+ if is_job_changed(measurement_job, job):
+ logger.info(f"Updating measurement job: "
+ f"{job.performanceMeasurementJobId}")
+ update_measurement_job(measurement_job, job)
+ uow.measurement_jobs.update(measurement_job)
+
+ uow.commit()
+
+
+def is_job_changed(current_job: MeasurementJob,
+ new_job: MeasurementJob) -> bool:
+ """Check if measurement job needs updating."""
+ return (current_job.state != new_job.state or
+ current_job.status != new_job.status or
+ current_job.collectionInterval != new_job.collectionInterval)
+
+
+def create_measurement_job(job: MeasurementJob) -> MeasurementJob:
+ """Create new measurement job."""
+ return MeasurementJob(
+ job_id=job.performanceMeasurementJobId,
+ consumer_job_id=job.consumerPerformanceJobId,
+ state=job.state or MeasurementJobState.ACTIVE,
+ collection_interval=job.collectionInterval,
+ measurement_criteria=job.measurementSelectionCriteria,
+ status=job.status or MeasurementJobStatus.RUNNING,
+ preinstalled_job=job.preinstalledJob,
+ resource_criteria=job.resourceScopeCriteria
+ )
+
+
+def update_measurement_job(target: MeasurementJob,
+ source: MeasurementJob) -> None:
+ """Update existing measurement job with new data."""
+ target.state = source.state
+ target.status = source.status
+ target.collectionInterval = source.collectionInterval
+ target.measurementSelectionCriteria = source.measurementSelectionCriteria
+ target.resourceScopeCriteria = source.resourceScopeCriteria
--- /dev/null
+# Copyright (C) 2025 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from o2common.domain import tags
+from o2common.service.messagebus import MessageBus
+from o2common.service.watcher.base import BaseWatcher
+from o2common.service.client.base_client import BaseClient
+
+from o2ims.domain import commands
+from o2ims.domain.stx_object import StxGenericModel
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class MeasurementWatcher(BaseWatcher):
+ def __init__(self, pm_client: BaseClient,
+ bus: MessageBus) -> None:
+ super().__init__(pm_client, bus)
+ self._tags = tags.Tag()
+ self.poolid = None
+
+ def _targetname(self):
+ return "measurement"
+
+ def _probe(self, parent: StxGenericModel, tags: object = None):
+ # Set a tag for children resource
+ self._tags.pool = parent.res_pool_id
+ self._set_respool_client()
+
+ # Get new measurements
+ newmodels = self._client.list(
+ size=100, # Configurable batch size
+ event_module='system' # Filter for system metrics
+ )
+
+ return [commands.UpdateMeasurement(m, self.poolid)
+ for m in newmodels] if len(newmodels) > 0 else []
+
+ def _set_respool_client(self):
+ self.poolid = self._tags.pool
# Updated to lastest commit at stx 9.0
-e git+https://opendev.org/starlingx/distcloud-client.git@r/stx.9.0#egg=distributedcloud-client&subdirectory=distributedcloud-client
-e git+https://opendev.org/starlingx/config.git@r/stx.9.0#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client
--e git+https://opendev.org/starlingx/fault.git@r/stx.9.0#egg=fmclient&subdirectory=python-fmclient/fmclient
\ No newline at end of file
+-e git+https://opendev.org/starlingx/fault.git@r/stx.9.0#egg=fmclient&subdirectory=python-fmclient/fmclient
+
+elasticsearch
\ No newline at end of file
# -e git+https://opendev.org/starlingx/distcloud-client.git@master#egg=distributedcloud-client&subdirectory=distributedcloud-client
# -e git+https://opendev.org/starlingx/config.git@master#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client
+
+elasticsearch
--- /dev/null
+# Copyright (C) 2025 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+from elasticsearch import Elasticsearch
+
+from o2common.config import config
+from o2ims.adapter.clients.pm_client import StxPmClientImp
+from o2ims.domain import performance_obj as pm_obj
+
+
+@pytest.fixture
+def real_es_client():
+ es_config = config.get_es_access_info()
+ es_client = Elasticsearch(
+ es_config['url'],
+ basic_auth=(es_config['username'], es_config['password']),
+ verify_certs=False
+ )
+ yield es_client
+
+
+@pytest.fixture
+def real_pm_client(real_es_client):
+ pm_client = StxPmClientImp(real_es_client)
+ yield pm_client
+
+
+def test_get_es_client():
+ pm_client = StxPmClientImp()
+ assert pm_client.es_client is not None
+ assert pm_client.es_client.ping()
+
+
+def test_get_pm_list(real_pm_client):
+ # Test getting PM list without filters
+ pm_list = real_pm_client.getPmList()
+ assert pm_list is not None
+ if len(pm_list) > 0:
+ assert isinstance(pm_list[0], pm_obj.PmGenericModel)
+ assert pm_list[0].id is not None
+ assert pm_list[0].host_name is not None
+
+
+def test_get_pm_list_with_host_filter(real_pm_client):
+ # Test getting PM list with host filter
+ pm_list = real_pm_client.getPmList(host_name="controller-0")
+ assert pm_list is not None
+ if len(pm_list) > 0:
+ assert isinstance(pm_list[0], pm_obj.PmGenericModel)
+ assert pm_list[0].host_name == "controller-0"
+
+
+def test_get_pm_list_with_size_filter(real_pm_client):
+ # Test getting PM list with size filter
+ size = 5
+ pm_list = real_pm_client.getPmList(size=size)
+ assert pm_list is not None
+ assert len(pm_list) <= size
+
+
+def test_get_pm_info(real_pm_client):
+ # First get a list to get a valid ID
+ pm_list = real_pm_client.getPmList()
+ if len(pm_list) > 0:
+ pm_id = pm_list[0].id
+ pm_info = real_pm_client.getPmInfo(pm_id)
+ assert pm_info is not None
+ assert isinstance(pm_info, pm_obj.PmGenericModel)
+ assert pm_info.id == pm_id
+
+
+def test_build_query():
+ # Test query building with different filter combinations
+ filters = {
+ 'host_name': 'controller-0',
+ 'size': 5
+ }
+ query = StxPmClientImp._build_query(**filters)
+
+ # Verify query structure
+ assert query['size'] == 0
+ assert 'query' in query
+ assert 'bool' in query['query']
+ assert 'must' in query['query']['bool']
+
+ # Verify must conditions
+ must_conditions = query['query']['bool']['must']
+ assert {'term': {'type': 'beats'}} in must_conditions
+ assert {'term': {'service.type': 'system'}} in must_conditions
+ assert {'term': {'host.name': 'controller-0'}} in must_conditions
+
+ # Verify aggregations
+ assert 'aggs' in query
+ assert 'unique_datasets' in query['aggs']
+ assert query['aggs']['unique_datasets']['terms']['size'] == 5
+
+
+def test_convert_to_pm_model():
+ # Test PM model conversion
+ es_hit = {
+ '_id': 'test_id',
+ '_source': {
+ 'host': {'name': 'test_host'},
+ 'event': {'module': 'test_module'},
+ 'raw_data': {'test': 'data'}
+ }
+ }
+
+ pm_model = StxPmClientImp._convert_to_pm_model(es_hit)
+ assert isinstance(pm_model, pm_obj.PmGenericModel)
+ assert pm_model.id == 'test_id'
+ assert pm_model.host_name == 'test_host'
+ assert pm_model.event_module == 'test_module'
+ assert pm_model.raw_data == es_hit['_source']
+
+
+def test_set_pm_client(real_pm_client):
+ # Test setting PM client for different resource pools
+ resource_pool_id = 'test_pool'
+ real_pm_client.setPmClient(resource_pool_id)
+ # Since setPmClient is a placeholder, just verify
+ # it doesn't raise an exception
+ assert True