child_respool = root.addchild(
ResourcePoolWatcher(StxResourcePoolClient(),
self.bus))
+
+ # Add Aggregate watch
child_respool.addchild(
ComputeAggWatcher(ComputeAggClient(), self.bus))
child_respool.addchild(
child_respool.addchild(
UndefinedAggWatcher(UndefinedAggClient(), self.bus))
+ # Add Resource watch
child_pserver = child_respool.addchild(
PServerWatcher(StxPserverClient(), self.bus))
child_pserver.addchild(
child_pserver.addchild(
PServerAccWatcher(StxAccClient(), self.bus))
- self.worker.add_watcher(root)
-
# Add Alarm watch
- root = WatcherTree(
+ child_respool.addchild(
AlarmWatcher(StxAlarmClient(self.bus.uow), self.bus))
+
self.worker.add_watcher(root)
self.worker.start()
return os_client_args
-def get_fm_access_info():
+def get_fm_access_info(subcloud_hostname: str = "",
+ sub_is_https: bool = False):
try:
# client_args = dict(
# auth_url=os.environ.get('OS_AUTH_URL', _DEFAULT_STX_URL),
os_client_args = {}
for key, val in client_args.items():
os_client_args['os_{key}'.format(key=key)] = val
+
auth_url = urlparse(os_client_args.pop('os_auth_url'))
+ os_client_args['auth_url'] = auth_url.geturl()
+
+ if "" != subcloud_hostname:
+ orig_auth_url = urlparse(_DEFAULT_STX_URL)
+ new_auth_url = orig_auth_url._replace(
+ netloc=orig_auth_url.netloc.replace(
+ orig_auth_url.hostname, subcloud_hostname))
+ if sub_is_https:
+ new_auth_url = new_auth_url._replace(
+ scheme=new_auth_url.scheme.
+ replace(new_auth_url.scheme, 'https'))
+ os_client_args['auth_url'] = new_auth_url.geturl()
+ os_client_args['endpoint_type'] = 'publicURL'
os_client_args['insecure'] = True
- os_client_args['auth_url'] = auth_url.geturl()
os_client_args['username'] = os_client_args.pop('os_username')
os_client_args['password'] = os_client_args.pop('os_api_key')
os_client_args['project_name'] = os_client_args.pop('os_project_name')
from typing import List # Optional, Set
import uuid as uuid
-# from dcmanagerclient.api import client
-# from cgtsclient.client import get_client as get_stx_client
-# from cgtsclient.exc import EndpointException
-# from dcmanagerclient.api.client import client as get_dc_client
+from cgtsclient.client import get_client as get_stx_client
+from cgtsclient.exc import EndpointException
+from dcmanagerclient.api.client import client as get_dc_client
from fmclient.client import get_client as get_fm_client
from fmclient.common.exceptions import HTTPNotFound
-from o2common.service.client.base_client import BaseClient
+from o2app.adapter import unit_of_work
from o2common.config import config
+from o2common.service.client.base_client import BaseClient
from o2ims.domain import alarm_obj as alarmModel
-from o2app.adapter import unit_of_work
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
return self.driver.getAlarmInfo(id)
def _list(self, **filters) -> List[alarmModel.FaultGenericModel]:
- # filters['resourcetypeid']
newmodels = self.driver.getAlarmList(**filters)
uow = self.uow
exist_alarms = {}
if exist_alarms[alarm]:
# exist alarm is active
continue
- event = self._get(alarm)
+ try:
+ event = self._get(alarm)
+ except HTTPNotFound:
+ logger.debug('alarm {} not in this resource pool {}'
+ .format(alarm, self._pool_id))
+ continue
ret.append(event)
return ret
def _set_stx_client(self):
- pass
+ self.driver.setFaultClient(self._pool_id)
class StxEventClient(BaseClient):
return self.driver.getEventList(**filters)
def _set_stx_client(self):
- pass
+ self.driver.setFaultClient(self._pool_id)
# internal driver which implement client call to Stx Fault Management instance
class StxFaultClientImp(object):
- def __init__(self, fm_client=None):
+ def __init__(self, fm_client=None, stx_client=None, dc_client=None):
super().__init__()
self.fmclient = fm_client if fm_client else self.getFmClient()
- # if subcloud_id is not None:
- # self.stxclient = self.getSubcloudClient(subcloud_id)
+ self.stxclient = stx_client if stx_client else self.getStxClient()
+ self.dcclient = dc_client if dc_client else self.getDcmanagerClient()
+
+ def getStxClient(self):
+ os_client_args = config.get_stx_access_info()
+ config_client = get_stx_client(**os_client_args)
+ return config_client
+
+ def getDcmanagerClient(self):
+ os_client_args = config.get_dc_access_info()
+ config_client = get_dc_client(**os_client_args)
+ return config_client
def getFmClient(self):
os_client_args = config.get_fm_access_info()
config_client = get_fm_client(1, **os_client_args)
return config_client
+ def getSubcloudList(self):
+ subs = self.dcclient.subcloud_manager.list_subclouds()
+ known_subs = [sub for sub in subs if sub.sync_status != 'unknown']
+ return known_subs
+
+ def getSubcloudFaultClient(self, subcloud_id):
+ subcloud = self.dcclient.subcloud_manager.\
+ subcloud_additional_details(subcloud_id)
+ logger.debug('subcloud name: %s, oam_floating_ip: %s' %
+ (subcloud[0].name, subcloud[0].oam_floating_ip))
+ try:
+ sub_is_https = False
+ os_client_args = config.get_stx_access_info(
+ region_name=subcloud[0].name,
+ subcloud_hostname=subcloud[0].oam_floating_ip)
+ stx_client = get_stx_client(**os_client_args)
+ except EndpointException as e:
+ msg = e.format_message()
+ if CGTSCLIENT_ENDPOINT_ERROR_MSG in msg:
+ sub_is_https = True
+ os_client_args = config.get_stx_access_info(
+ region_name=subcloud[0].name, sub_is_https=sub_is_https,
+ subcloud_hostname=subcloud[0].oam_floating_ip)
+ stx_client = get_stx_client(**os_client_args)
+ else:
+ raise ValueError('Stx endpoint exception: %s' % msg)
+ except Exception:
+ raise ValueError('cgtsclient get subcloud client failed')
+
+ os_client_args = config.get_fm_access_info(
+ sub_is_https=sub_is_https,
+ subcloud_hostname=subcloud[0].oam_floating_ip)
+ fm_client = get_fm_client(1, **os_client_args)
+
+ return stx_client, fm_client
+
+ def setFaultClient(self, resource_pool_id):
+ systems = self.stxclient.isystem.list()
+ if resource_pool_id == systems[0].uuid:
+ logger.debug('Fault Client not change: %s' % resource_pool_id)
+ self.fmclient = self.getFmClient()
+ return
+
+ subclouds = self.getSubcloudList()
+ for subcloud in subclouds:
+ substxclient, subfaultclient = self.getSubcloudFaultClient(
+ subcloud.subcloud_id)
+ systems = substxclient.isystem.list()
+ if resource_pool_id == systems[0].uuid:
+ self.fmclient = subfaultclient
+
def getAlarmList(self, **filters) -> List[alarmModel.FaultGenericModel]:
alarms = self.fmclient.alarm.list(expand=True)
if len(alarms) == 0:
try:
alarm = self.fmclient.alarm.get(id)
logger.debug('get alarm id ' + id + ':' + str(alarm.to_dict()))
- # print(alarm.to_dict())
except HTTPNotFound:
event = self.fmclient.event_log.get(id)
return alarmModel.FaultGenericModel(
def getEventInfo(self, id) -> alarmModel.FaultGenericModel:
event = self.fmclient.event_log.get(id)
logger.debug('get event id ' + id + ':' + str(event.to_dict()))
- # print(event.to_dict())
return alarmModel.FaultGenericModel(
alarmModel.EventTypeEnum.EVENT, self._eventconverter(event))
# client talking to Stx standalone
import uuid
-from o2common.service.client.base_client import BaseClient
from typing import List
# Optional, Set
-from o2ims.domain import stx_object as ocloudModel
-from o2common.config import config
-from o2ims.domain.resource_type import ResourceTypeEnum
-# from dcmanagerclient.api import client
from cgtsclient.client import get_client as get_stx_client
from cgtsclient.exc import EndpointException
from dcmanagerclient.api.client import client as get_dc_client
+from o2common.config import config
+from o2common.service.client.base_client import BaseClient
+from o2ims.domain import stx_object as ocloudModel
+from o2ims.domain.resource_type import ResourceTypeEnum
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
os_client_args = config.get_stx_access_info(
region_name=subcloud[0].name,
subcloud_hostname=subcloud[0].oam_floating_ip)
- logger.info(os_client_args)
+ # logger.info(os_client_args)
config_client = get_stx_client(**os_client_args)
except EndpointException as e:
msg = e.format_message()
os_client_args = config.get_stx_access_info(
region_name=subcloud[0].name, sub_is_https=True,
subcloud_hostname=subcloud[0].oam_floating_ip)
- logger.info(os_client_args)
+ # logger.info(os_client_args)
config_client = get_stx_client(**os_client_args)
else:
raise ValueError('Stx endpoint exception: %s' % msg)
@dataclass
class UpdateAlarm(UpdateFaultObject):
- pass
+ parentid: str
+ " id: " + str(fmobj.id)
+ " hash: " + str(fmobj.hash))
with uow:
+ resourcepool = uow.resource_pools.get(cmd.parentid)
+
alarm_event_record = uow.alarm_event_records.get(fmobj.id)
if not alarm_event_record:
logger.info("add alarm event record:" + fmobj.name
# TODO: handle different resource type
hostname = entity_instance_id.split('.')[0].split('=')[1]
logger.debug('hostname: ' + hostname)
- respools = uow.resource_pools.list()
- respoolids = [respool.resourcePoolId for respool in
- respools if respool.oCloudId ==
- respool.resourcePoolId]
+
restype = uow.resource_types.get_by_name('pserver')
localmodel.resourceTypeId = restype.resourceTypeId
- hosts = uow.resources.list(respoolids[0], **{
+ hosts = uow.resources.list(resourcepool.resourcePoolId, **{
'resourceTypeId': restype.resourceTypeId
})
for host in hosts:
# localmodel.resourceId = check_res_id(uow, fmobj)
# logger.debug("resource ID: " + localmodel.resourceId)
# uow.alarm_event_records.add(localmodel)
+ else:
+ restype = uow.resource_types.get_by_name('undefined_aggregate')
+ localmodel.resourceTypeId = restype.resourceTypeId
+
+ undefined_res = uow.resources.list(
+ resourcepool.resourcePoolId, **{
+ 'resourceTypeId': restype.resourceTypeId
+ })
+ localmodel.resourceId = undefined_res[0].resourceId
+ uow.alarm_event_records.add(localmodel)
+ logger.info("Add the alarm event record: " + fmobj.id
+ + ", name: " + fmobj.name)
else:
localmodel = alarm_event_record
# See the License for the specific language governing permissions and
# limitations under the License.
-# from o2ims.domain.resource_type import ResourceTypeEnum
-from o2common.service.client.base_client import BaseClient
-# from o2ims.domain.stx_object import StxGenericModel
-# from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2common.service.watcher.base import BaseWatcher
+from o2common.domain import tags
from o2common.service.messagebus import MessageBus
+from o2common.service.watcher.base import BaseWatcher
+from o2common.service.client.base_client import BaseClient
+
from o2ims.domain import commands
+from o2ims.domain.stx_object import StxGenericModel
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
def __init__(self, fault_client: BaseClient,
bus: MessageBus) -> None:
super().__init__(fault_client, bus)
+ self._tags = tags.Tag()
+ self.poolid = None
def _targetname(self):
return "alarm"
- def _probe(self, parent: object = None, tags: object = None):
- newmodels = self._client.list()
- # if len(newmodels) == 0:
- # return []
-
- # uow = self._bus.uow
- # exist_alarms = {}
- # with uow:
- # rs = uow.session.execute(
- # '''
- # SELECT "alarmEventRecordId"
- # FROM "alarmEventRecord"
- # WHERE "perceivedSeverity" != :perceived_severity_enum
- # ''',
- # dict(perceived_severity_enum=alarm_obj.PerceivedSeverityEnum.
- # CLEARED)
- # )
- # for row in rs:
- # id = row[0]
- # # logger.debug('Exist alarm: ' + id)
- # exist_alarms[id] = False
+ def _probe(self, parent: StxGenericModel, tags: object = None):
+ # Set a tag for children resource
+ self._tags.pool = parent.res_pool_id
+ self._set_respool_client()
- # ret = []
- # for m in newmodels:
- # try:
- # if exist_alarms[m.id]:
- # ret.append(commands.UpdateAlarm(m))
- # exist_alarms[m.id] = True
- # except KeyError:
- # logger.debug('alarm new: ' + m.id)
- # ret.append(commands.UpdateAlarm(m))
-
- # for alarm in exist_alarms:
- # logger.debug('exist alarm: ' + alarm)
- # if exist_alarms[alarm]:
- # # exist alarm is active
- # continue
- # event = self._client.get(alarm)
- # ret.append(commands.UpdateAlarm(event))
-
- # return ret
-
- return [commands.UpdateAlarm(m) for m in newmodels] \
+ resourcepoolid = parent.id
+ newmodels = self._client.list()
+ return [commands.UpdateAlarm(m, resourcepoolid) for m in newmodels] \
if len(newmodels) > 0 else []
-
-# class EventWatcher(BaseWatcher):
-# def __init__(self, fault_client: BaseClient,
-# bus: MessageBus) -> None:
-# super().__init__(fault_client, bus)
-
-# def _targetname(self):
-# return "event"
-
-# def _probe(self, parent: object = None, tags: object = None):
-# newmodels = self._client.list()
-# return [commands.UpdateAlarm(m) for m in newmodels] \
-# if len(newmodels) > 0 else []
+ def _set_respool_client(self):
+ self.poolid = self._tags.pool
+ self._client.set_pool_driver(self.poolid)
event2 = fmClientImp.getEventInfo(event1.id)
assert event1 != event2
assert event1.id == event2.id
+
+
+def test_get_subcloud_alarmlist(real_stx_fm_client, real_stx_dc_client):
+ fmClientImp = StxFaultClientImp(
+ real_stx_fm_client, dc_client=real_stx_dc_client)
+ assert fmClientImp is not None
+ subclouds = fmClientImp.getSubcloudList()
+ stxclient, _ = fmClientImp.getSubcloudFaultClient(subclouds[0].subcloud_id)
+ res_pool_id = stxclient.isystem.list()[0].uuid
+
+ fmClientImp.setFaultClient(res_pool_id)
+ alarms = fmClientImp.getAlarmList()
+ assert alarms is not None
+ assert len(alarms) > 0
bus = create_alarm_fake_bus(fakeuow)
fakeClient = FakeAlarmClient()
alarmwatcher = AlarmWatcher(fakeClient, bus)
- cmds = alarmwatcher.probe()
+ parent = type('obj', (object,), {
+ 'data': type('obj', (object,), {
+ 'id': 'test_parent_id',
+ 'res_pool_id': 'test_res_pool'
+ })})
+ cmds = alarmwatcher.probe(parent)
assert cmds is not None
assert len(cmds) == 1
assert cmds[0].data.name == "alarm"