child_respool = root.addchild(
ResourcePoolWatcher(StxResourcePoolClient(),
self.bus))
+
+ # Add Aggregate watch
child_respool.addchild(
ComputeAggWatcher(ComputeAggClient(), self.bus))
child_respool.addchild(
child_respool.addchild(
UndefinedAggWatcher(UndefinedAggClient(), self.bus))
+ # Add Resource watch
child_pserver = child_respool.addchild(
PServerWatcher(StxPserverClient(), self.bus))
child_pserver.addchild(
child_pserver.addchild(
PServerAccWatcher(StxAccClient(), self.bus))
- self.worker.add_watcher(root)
-
# Add Alarm watch
- root = WatcherTree(
+ child_respool.addchild(
AlarmWatcher(StxAlarmClient(self.bus.uow), self.bus))
+
self.worker.add_watcher(root)
self.worker.start()
return os_client_args
-def get_fm_access_info():
+def get_fm_access_info(subcloud_hostname: str = "",
+ sub_is_https: bool = False):
try:
# client_args = dict(
# auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
os_client_args = {}
for key, val in client_args.items():
os_client_args['os_{key}'.format(key=key)] = val
+
auth_url = urlparse(os_client_args.pop('os_auth_url'))
+ os_client_args['auth_url'] = auth_url.geturl()
+
+ if "" != subcloud_hostname:
+ orig_auth_url = urlparse(_DEFAULT_STX_URL)
+ new_auth_url = orig_auth_url._replace(
+ netloc=orig_auth_url.netloc.replace(
+ orig_auth_url.hostname, subcloud_hostname))
+ if sub_is_https:
+ new_auth_url = new_auth_url._replace(
+ scheme=new_auth_url.scheme.
+ replace(new_auth_url.scheme, 'https'))
+ os_client_args['auth_url'] = new_auth_url.geturl()
+ os_client_args['endpoint_type'] = 'publicURL'
os_client_args['insecure'] = True
- os_client_args['auth_url'] = auth_url.geturl()
os_client_args['username'] = os_client_args.pop('os_username')
os_client_args['password'] = os_client_args.pop('os_api_key')
os_client_args['project_name'] = os_client_args.pop('os_project_name')
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -*- coding: utf-8 -*-
+from __future__ import unicode_literals
+
+from flask_restx import fields
+import json
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class Json2Dict(fields.Raw):
+
+ def format(self, value):
+ value2 = None
+ try:
+ value2 = json.loads(value) if value else None
+ except Exception as ex:
+ logger.warning(
+ f"Failed to loads json string: {value}, exception: {str(ex)}")
+ value2 = value
+ return value2
from typing import List # Optional, Set
import uuid as uuid
-# from dcmanagerclient.api import client
-# from cgtsclient.client import get_client as get_stx_client
-# from cgtsclient.exc import EndpointException
-# from dcmanagerclient.api.client import client as get_dc_client
+from cgtsclient.client import get_client as get_stx_client
+from cgtsclient.exc import EndpointException
+from dcmanagerclient.api.client import client as get_dc_client
from fmclient.client import get_client as get_fm_client
from fmclient.common.exceptions import HTTPNotFound
-from o2common.service.client.base_client import BaseClient
+from o2app.adapter import unit_of_work
from o2common.config import config
+from o2common.service.client.base_client import BaseClient
from o2ims.domain import alarm_obj as alarmModel
-from o2app.adapter import unit_of_work
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
return self.driver.getAlarmInfo(id)
def _list(self, **filters) -> List[alarmModel.FaultGenericModel]:
- # filters['resourcetypeid']
newmodels = self.driver.getAlarmList(**filters)
uow = self.uow
exist_alarms = {}
if exist_alarms[alarm]:
# exist alarm is active
continue
- event = self._get(alarm)
+ try:
+ event = self._get(alarm)
+ except HTTPNotFound:
+ logger.debug('alarm {} not in this resource pool {}'
+ .format(alarm, self._pool_id))
+ continue
ret.append(event)
return ret
def _set_stx_client(self):
- pass
+ self.driver.setFaultClient(self._pool_id)
class StxEventClient(BaseClient):
return self.driver.getEventList(**filters)
def _set_stx_client(self):
- pass
+ self.driver.setFaultClient(self._pool_id)
# internal driver which implement client call to Stx Fault Management instance
class StxFaultClientImp(object):
- def __init__(self, fm_client=None):
+ def __init__(self, fm_client=None, stx_client=None, dc_client=None):
super().__init__()
self.fmclient = fm_client if fm_client else self.getFmClient()
- # if subcloud_id is not None:
- # self.stxclient = self.getSubcloudClient(subcloud_id)
+ self.stxclient = stx_client if stx_client else self.getStxClient()
+ self.dcclient = dc_client if dc_client else self.getDcmanagerClient()
+
+ def getStxClient(self):
+ os_client_args = config.get_stx_access_info()
+ config_client = get_stx_client(**os_client_args)
+ return config_client
+
+ def getDcmanagerClient(self):
+ os_client_args = config.get_dc_access_info()
+ config_client = get_dc_client(**os_client_args)
+ return config_client
def getFmClient(self):
os_client_args = config.get_fm_access_info()
config_client = get_fm_client(1, **os_client_args)
return config_client
+ def getSubcloudList(self):
+ subs = self.dcclient.subcloud_manager.list_subclouds()
+ known_subs = [sub for sub in subs if sub.sync_status != 'unknown']
+ return known_subs
+
+ def getSubcloudFaultClient(self, subcloud_id):
+ subcloud = self.dcclient.subcloud_manager.\
+ subcloud_additional_details(subcloud_id)
+ logger.debug('subcloud name: %s, oam_floating_ip: %s' %
+ (subcloud[0].name, subcloud[0].oam_floating_ip))
+ try:
+ sub_is_https = False
+ os_client_args = config.get_stx_access_info(
+ region_name=subcloud[0].name,
+ subcloud_hostname=subcloud[0].oam_floating_ip)
+ stx_client = get_stx_client(**os_client_args)
+ except EndpointException as e:
+ msg = e.format_message()
+ if CGTSCLIENT_ENDPOINT_ERROR_MSG in msg:
+ sub_is_https = True
+ os_client_args = config.get_stx_access_info(
+ region_name=subcloud[0].name, sub_is_https=sub_is_https,
+ subcloud_hostname=subcloud[0].oam_floating_ip)
+ stx_client = get_stx_client(**os_client_args)
+ else:
+ raise ValueError('Stx endpoint exception: %s' % msg)
+ except Exception:
+ raise ValueError('cgtsclient get subcloud client failed')
+
+ os_client_args = config.get_fm_access_info(
+ sub_is_https=sub_is_https,
+ subcloud_hostname=subcloud[0].oam_floating_ip)
+ fm_client = get_fm_client(1, **os_client_args)
+
+ return stx_client, fm_client
+
+ def setFaultClient(self, resource_pool_id):
+ systems = self.stxclient.isystem.list()
+ if resource_pool_id == systems[0].uuid:
+ logger.debug('Fault Client not change: %s' % resource_pool_id)
+ self.fmclient = self.getFmClient()
+ return
+
+ subclouds = self.getSubcloudList()
+ for subcloud in subclouds:
+ substxclient, subfaultclient = self.getSubcloudFaultClient(
+ subcloud.subcloud_id)
+ systems = substxclient.isystem.list()
+ if resource_pool_id == systems[0].uuid:
+ self.fmclient = subfaultclient
+
def getAlarmList(self, **filters) -> List[alarmModel.FaultGenericModel]:
alarms = self.fmclient.alarm.list(expand=True)
if len(alarms) == 0:
try:
alarm = self.fmclient.alarm.get(id)
logger.debug('get alarm id ' + id + ':' + str(alarm.to_dict()))
- # print(alarm.to_dict())
except HTTPNotFound:
event = self.fmclient.event_log.get(id)
return alarmModel.FaultGenericModel(
def getEventInfo(self, id) -> alarmModel.FaultGenericModel:
event = self.fmclient.event_log.get(id)
logger.debug('get event id ' + id + ':' + str(event.to_dict()))
- # print(event.to_dict())
return alarmModel.FaultGenericModel(
alarmModel.EventTypeEnum.EVENT, self._eventconverter(event))
# client talking to Stx standalone
import uuid
-from o2common.service.client.base_client import BaseClient
from typing import List
# Optional, Set
-from o2ims.domain import stx_object as ocloudModel
-from o2common.config import config
-from o2ims.domain.resource_type import ResourceTypeEnum
-# from dcmanagerclient.api import client
from cgtsclient.client import get_client as get_stx_client
from cgtsclient.exc import EndpointException
from dcmanagerclient.api.client import client as get_dc_client
+from o2common.config import config
+from o2common.service.client.base_client import BaseClient
+from o2ims.domain import stx_object as ocloudModel
+from o2ims.domain.resource_type import ResourceTypeEnum
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
os_client_args = config.get_stx_access_info(
region_name=subcloud[0].name,
subcloud_hostname=subcloud[0].oam_floating_ip)
- logger.info(os_client_args)
+ # logger.info(os_client_args)
config_client = get_stx_client(**os_client_args)
except EndpointException as e:
msg = e.format_message()
os_client_args = config.get_stx_access_info(
region_name=subcloud[0].name, sub_is_https=True,
subcloud_hostname=subcloud[0].oam_floating_ip)
- logger.info(os_client_args)
+ # logger.info(os_client_args)
config_client = get_stx_client(**os_client_args)
else:
raise ValueError('Stx endpoint exception: %s' % msg)
Column("name", String(255)),
Column("globalAssetId", String(255)),
Column("parentId", String(255)),
- Column("description", String(255)),
- Column("elements", Text())
- # Column("extensions", String(1024))
+ Column("description", String()),
+ Column("elements", Text()),
+ Column("extensions", String())
)
deploymentmanager = Table(
@dataclass
class UpdateAlarm(UpdateFaultObject):
- pass
+ parentid: str
def __init__(self, resourceId: str, resourceTypeId: str,
resourcePoolId: str, name: str, parentId: str = '',
gAssetId: str = '', elements: str = '',
- description: str = '') -> None:
+ description: str = '', extensions: str = '') -> None:
super().__init__()
self.resourceId = resourceId
self.description = description
self.globalAssetId = gAssetId
self.resourcePoolId = resourcePoolId
self.elements = elements
- self.extensions = []
+ self.extensions = extensions
self.name = name
self.parentId = parentId
+ " id: " + str(fmobj.id)
+ " hash: " + str(fmobj.hash))
with uow:
+ resourcepool = uow.resource_pools.get(cmd.parentid)
+
alarm_event_record = uow.alarm_event_records.get(fmobj.id)
if not alarm_event_record:
logger.info("add alarm event record:" + fmobj.name
# TODO: handle different resource type
hostname = entity_instance_id.split('.')[0].split('=')[1]
logger.debug('hostname: ' + hostname)
- respools = uow.resource_pools.list()
- respoolids = [respool.resourcePoolId for respool in
- respools if respool.oCloudId ==
- respool.resourcePoolId]
+
restype = uow.resource_types.get_by_name('pserver')
localmodel.resourceTypeId = restype.resourceTypeId
- hosts = uow.resources.list(respoolids[0], **{
+ hosts = uow.resources.list(resourcepool.resourcePoolId, **{
'resourceTypeId': restype.resourceTypeId
})
for host in hosts:
# localmodel.resourceId = check_res_id(uow, fmobj)
# logger.debug("resource ID: " + localmodel.resourceId)
# uow.alarm_event_records.add(localmodel)
+ else:
+ restype = uow.resource_types.get_by_name('undefined_aggregate')
+ localmodel.resourceTypeId = restype.resourceTypeId
+
+ undefined_res = uow.resources.list(
+ resourcepool.resourcePoolId, **{
+ 'resourceTypeId': restype.resourceTypeId
+ })
+ localmodel.resourceId = undefined_res[0].resourceId
+ uow.alarm_event_records.add(localmodel)
+ logger.info("Add the alarm event record: " + fmobj.id
+ + ", name: " + fmobj.name)
else:
localmodel = alarm_event_record
# pylint: disable=unused-argument
from __future__ import annotations
import uuid
-# import json
+import json
from o2ims.domain import commands, events
from o2ims.domain.stx_object import StxGenericModel
resourcepool_id = parent.resourcePoolId
parent_id = parent.resourceId
gAssetId = '' # TODO: global ID
- description = "%s : An Accelerator resource of the physical server"\
- % stxobj.name
+ # description = "%s : An Accelerator resource of the physical server"\
+ # % stxobj.name
+ content = json.loads(stxobj.content)
+ selected_keys = [
+ "name", "pdevice", "pciaddr", "pvendor_id", "pvendor",
+ "pclass_id", "pclass", "psvendor", "psdevice",
+ "sriov_totalvfs", "sriov_numvfs", "numa_node"
+ ]
+ filtered = dict(
+ filter(lambda item: item[0] in selected_keys, content.items()))
+ extensions = json.dumps(filtered)
+ description = ";".join([f"{k}:{v}" for k, v in filtered.items()])
resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
stxobj.name, parent_id, gAssetId, stxobj.content,
- description)
+ description, extensions)
resource.createtime = stxobj.createtime
resource.updatetime = stxobj.updatetime
resource.hash = stxobj.hash
# See the License for the specific language governing permissions and
# limitations under the License.
-# from o2ims.domain.resource_type import ResourceTypeEnum
-from o2common.service.client.base_client import BaseClient
-# from o2ims.domain.stx_object import StxGenericModel
-# from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2common.service.watcher.base import BaseWatcher
+from o2common.domain import tags
from o2common.service.messagebus import MessageBus
+from o2common.service.watcher.base import BaseWatcher
+from o2common.service.client.base_client import BaseClient
+
from o2ims.domain import commands
+from o2ims.domain.stx_object import StxGenericModel
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
def __init__(self, fault_client: BaseClient,
bus: MessageBus) -> None:
super().__init__(fault_client, bus)
+ self._tags = tags.Tag()
+ self.poolid = None
def _targetname(self):
return "alarm"
- def _probe(self, parent: object = None, tags: object = None):
- newmodels = self._client.list()
- # if len(newmodels) == 0:
- # return []
-
- # uow = self._bus.uow
- # exist_alarms = {}
- # with uow:
- # rs = uow.session.execute(
- # '''
- # SELECT "alarmEventRecordId"
- # FROM "alarmEventRecord"
- # WHERE "perceivedSeverity" != :perceived_severity_enum
- # ''',
- # dict(perceived_severity_enum=alarm_obj.PerceivedSeverityEnum.
- # CLEARED)
- # )
- # for row in rs:
- # id = row[0]
- # # logger.debug('Exist alarm: ' + id)
- # exist_alarms[id] = False
+ def _probe(self, parent: StxGenericModel, tags: object = None):
+ # Set a tag for children resource
+ self._tags.pool = parent.res_pool_id
+ self._set_respool_client()
- # ret = []
- # for m in newmodels:
- # try:
- # if exist_alarms[m.id]:
- # ret.append(commands.UpdateAlarm(m))
- # exist_alarms[m.id] = True
- # except KeyError:
- # logger.debug('alarm new: ' + m.id)
- # ret.append(commands.UpdateAlarm(m))
-
- # for alarm in exist_alarms:
- # logger.debug('exist alarm: ' + alarm)
- # if exist_alarms[alarm]:
- # # exist alarm is active
- # continue
- # event = self._client.get(alarm)
- # ret.append(commands.UpdateAlarm(event))
-
- # return ret
-
- return [commands.UpdateAlarm(m) for m in newmodels] \
+ resourcepoolid = parent.id
+ newmodels = self._client.list()
+ return [commands.UpdateAlarm(m, resourcepoolid) for m in newmodels] \
if len(newmodels) > 0 else []
-
-# class EventWatcher(BaseWatcher):
-# def __init__(self, fault_client: BaseClient,
-# bus: MessageBus) -> None:
-# super().__init__(fault_client, bus)
-
-# def _targetname(self):
-# return "event"
-
-# def _probe(self, parent: object = None, tags: object = None):
-# newmodels = self._client.list()
-# return [commands.UpdateAlarm(m) for m in newmodels] \
-# if len(newmodels) > 0 else []
+ def _set_respool_client(self):
+ self.poolid = self._tags.pool
+ self._client.set_pool_driver(self.poolid)
from flask_restx import fields
from o2ims.views.api_ns import api_ims_inventory as api_ims_inventory_v1
+from o2common.views.flask_restx_fields import Json2Dict
class OcloudDTO:
'parentId': fields.String,
'description': fields.String,
# 'elements': fields.String,
- 'extensions': fields.String
+ # 'extensions': fields.String
+ 'extensions': Json2Dict(attribute='extensions')
+ # 'extensions': fields.Raw(attribute='extensions')
},
mask='{resourceId,resourcePoolId,resourceTypeId,description,parentId}'
)
'parentId': fields.String,
'description': fields.String,
# 'elements': fields.String,
- 'extensions': fields.String
+ # 'extensions': fields.String
+ 'extensions': Json2Dict(attribute='extensions')
+ # 'extensions': fields.Raw(attribute='extensions')
}
if iteration_number:
resource_json_mapping['elements'] = fields.List(
event2 = fmClientImp.getEventInfo(event1.id)
assert event1 != event2
assert event1.id == event2.id
+
+
+def test_get_subcloud_alarmlist(real_stx_fm_client, real_stx_dc_client):
+ fmClientImp = StxFaultClientImp(
+ real_stx_fm_client, dc_client=real_stx_dc_client)
+ assert fmClientImp is not None
+ subclouds = fmClientImp.getSubcloudList()
+ stxclient, _ = fmClientImp.getSubcloudFaultClient(subclouds[0].subcloud_id)
+ res_pool_id = stxclient.isystem.list()[0].uuid
+
+ fmClientImp.setFaultClient(res_pool_id)
+ alarms = fmClientImp.getAlarmList()
+ assert alarms is not None
+ assert len(alarms) > 0
bus = create_alarm_fake_bus(fakeuow)
fakeClient = FakeAlarmClient()
alarmwatcher = AlarmWatcher(fakeClient, bus)
- cmds = alarmwatcher.probe()
+ parent = type('obj', (object,), {
+ 'data': type('obj', (object,), {
+ 'id': 'test_parent_id',
+ 'res_pool_id': 'test_res_pool'
+ })})
+ cmds = alarmwatcher.probe(parent)
assert cmds is not None
assert len(cmds) == 1
assert cmds[0].data.name == "alarm"