+++ /dev/null
-#!/usr/bin/env python
-#
-# Copyright(c) 2017-2019 Intel Corporation and OPNFV. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import json
-import sys
-import base64
-import logging
-import argparse
-import ssl
-from socket import timeout
-
-try:
- import configparser
-except ImportError:
- import ConfigParser as configparser
-
-from distutils.util import strtobool
-from kafka import KafkaConsumer
-
-from normalizer import Normalizer
-from normalizer import CollectdValue
-
-try:
- # For Python 3.0 and later
- import urllib.request as url
-except ImportError:
- # Fall back to Python 2's urllib2
- import urllib2 as url
-
-
-class VESApp(Normalizer):
- """VES Application"""
-
- def __init__(self):
- """Application initialization"""
- self._app_config = {
- 'Domain': '127.0.0.1',
- 'Port': 9999,
- 'Path': '',
- 'Username': 'user',
- 'Password': 'password',
- 'Directory_path': 'events',
- 'UseHttps': False,
- 'SendEventInterval': 10.0,
- 'ApiVersion': 5,
- 'KafkaPort': 9092,
- 'KafkaBroker': 'mykafka'
- }
-
- def send_data(self, event):
- """Send event to VES"""
- server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
- 's' if self._app_config['UseHttps'] else '',
- self._app_config['Domain'], int(self._app_config['Port']),
- '{}'.format('/{}'.format(self._app_config['Path']) if len(
- self._app_config['Path']) > 0 else ''),
- int(self._app_config['ApiVersion']), '{}'.format(
- '/{}'.format(self._app_config['Directory_path']) if len(
- self._app_config['Directory_path']) > 0 else ''))
- logging.info('Vendor Event Listener is at: {}'.format(server_url))
- credentials = base64.b64encode('{}:{}'.format(
- self._app_config['Username'],
- self._app_config['Password']).encode()).decode()
- logging.info('Authentication credentials are: {}'.format(credentials))
- try:
- request = url.Request(server_url)
- request.add_header('Authorization', 'Basic {}'.format(credentials))
- request.add_header('Content-Type', 'application/json')
- event_str = json.dumps(event).encode()
- logging.debug("Sending {} to {}".format(event_str, server_url))
- ssl._create_default_https_context = ssl._create_unverified_context
- url.urlopen(request, event_str, timeout=1).read().decode('utf-8')
- logging.debug("Sent data to {} successfully".format(server_url))
- except (HTTPError, URLError) as e:
- logging.error('Vendor Event Listener is is not reachable: {}'.format(e))
- except timeout:
- logging.error('socket timed out - URL %s', url)
- except Exception as e:
- logging.error('Vendor Event Listener error: {}'.format(e))
- else:
- logging.info('Access successful.')
-
- def config(self, config):
- """VES option configuration"""
- for key, value in config.items('config'):
- if key in self._app_config:
- try:
- if type(self._app_config[key]) == int:
- value = int(value)
- elif type(self._app_config[key]) == float:
- value = float(value)
- elif type(self._app_config[key]) == bool:
- value = bool(strtobool(value))
-
- if isinstance(value, type(self._app_config[key])):
- self._app_config[key] = value
- else:
- logging.error("Type mismatch with %s" % key)
- sys.exit()
- except ValueError:
- logging.error("Incorrect value type for %s" % key)
- sys.exit()
- else:
- logging.error("Incorrect key configuration %s" % key)
- sys.exit()
-
- def init(self, configfile, schema_file):
- if configfile is not None:
- # read VES configuration file if provided
- config = configparser.ConfigParser()
- config.optionxform = lambda option: option
- config.read(configfile)
- self.config(config)
- # initialize normalizer
- self.initialize(schema_file, self._app_config['SendEventInterval'])
-
- def run(self):
- """Consumer JSON data from kafka broker"""
- kafka_server = '{}:{}'.format(
- self._app_config.get('KafkaBroker'),
- self._app_config.get('KafkaPort'))
- consumer = KafkaConsumer(
- 'collectd', bootstrap_servers=kafka_server,
- auto_offset_reset='latest', enable_auto_commit=False,
- value_deserializer=lambda m: json.loads(m.decode('ascii')))
-
- for message in consumer:
- for kafka_data in message.value:
- # {
- # u'dstypes': [u'derive'],
- # u'plugin': u'cpu',
- # u'dsnames': [u'value'],
- # u'interval': 10.0,
- # u'host': u'localhost',
- # u'values': [99.9978996416267],
- # u'time': 1502114956.244,
- # u'plugin_instance': u'44',
- # u'type_instance': u'idle',
- # u'type': u'cpu'
- # }
- logging.debug('{}:run():data={}'.format(
- self.__class__.__name__, kafka_data))
- for ds_name in kafka_data['dsnames']:
- index = kafka_data['dsnames'].index(ds_name)
- val_hash = CollectdValue.hash_gen(
- kafka_data['host'], kafka_data['plugin'],
- kafka_data['plugin_instance'], kafka_data['type'],
- kafka_data['type_instance'], ds_name)
- collector = self.get_collector()
- val = collector.get(val_hash)
- if val:
- # update the value
- val.value = kafka_data['values'][index]
- val.time = kafka_data['time']
- del(val)
- else:
- # add new value into the collector
- val = CollectdValue()
- val.host = kafka_data['host']
- val.plugin = kafka_data['plugin']
- val.plugin_instance = kafka_data['plugin_instance']
- val.type = kafka_data['type']
- val.type_instance = kafka_data['type_instance']
- val.value = kafka_data['values'][index]
- val.interval = kafka_data['interval']
- val.time = kafka_data['time']
- val.ds_name = ds_name
- collector.add(val)
-
-
-def main():
- # Parsing cmdline options
- parser = argparse.ArgumentParser()
- parser.add_argument("--events-schema", dest="schema", required=True,
- help="YAML events schema definition", metavar="FILE")
- parser.add_argument("--config", dest="configfile", default=None,
- help="Specify config file", metavar="FILE")
- parser.add_argument("--loglevel", dest="level", default='INFO',
- choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
- help="Specify log level (default: %(default)s)",
- metavar="LEVEL")
- parser.add_argument("--logfile", dest="logfile", default='agent_app.log',
- help="Specify log file (default: %(default)s)",
- metavar="FILE")
- args = parser.parse_args()
-
- # Create log file
- logging.basicConfig(filename=args.logfile,
- format='%(asctime)s %(message)s',
- level=args.level)
- if args.configfile is None:
- logging.warning("No configfile specified, using default options")
-
- # Create Application Instance
- application_instance = VESApp()
- application_instance.init(args.configfile, args.schema)
-
- try:
- # Run the plugin
- application_instance.run()
- except KeyboardInterrupt:
- logging.info(" - Ctrl-C handled, exiting gracefully")
- except Exception as e:
- logging.error('{}, {}'.format(type(e), e))
- finally:
- application_instance.destroy()
- sys.exit()
-
-
-if __name__ == '__main__':
- main()
+++ /dev/null
-#
-# Copyright(c) 2017 Intel Corporation. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Authors:
-# Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
-#
-
-import yaml
-import logging
-import datetime
-import time
-from threading import RLock
-from threading import Timer
-from threading import Thread
-import re
-
-# import YAML loader
-try:
- from yaml import CLoader as Loader
-except ImportError:
- from yaml import Loader
-
-# import synchronized queue
-try:
- import queue
-except ImportError:
- import Queue as queue
-
-
-class Config(object):
- """Configuration class used to pass config option into YAML file"""
-
- def __init__(self, interval):
- self.interval = interval
-
-
-class System(object):
- """System class which provides information like host, time etc., into YAML
- file"""
-
- def __init__(self):
- self.hostname = 'localhost'
- self._id = 0
-
- @property
- def id(self):
- self._id = self._id + 1
- return self._id
-
- @property
- def time(self):
- return time.time()
-
- @property
- def date(self):
- return datetime.date.today().isoformat()
-
-
-class ItemIterator(object):
- """Item iterator returned by Collector class"""
-
- def __init__(self, collector, items):
- """Item iterator init"""
- logging.debug('{}:__init__()'.format(self.__class__.__name__))
- self._items = items
- self._collector = collector
- self._index = 0
-
- def __next__(self):
- """Returns next item from the list"""
- if self._index == len(self._items):
- raise StopIteration
- curr_index = self._index
- self._index = curr_index + 1
- return self.items[curr_index]
-
- def __getitem__(self, key):
- """get item by index"""
- return self._items[key]
-
- def __len__(self):
- """Return length of elements"""
- return len(self._items)
-
- def __del__(self):
- """Destroy iterator and unlock the collector"""
- logging.debug('{}:__del__()'.format(self.__class__.__name__))
- self._collector.unlock()
-
-
-class ItemObject(object):
- """Item object returned by Collector class"""
-
- def __init__(self, collector, hash_):
- """Item object init"""
- logging.debug('{}:__init__()'.format(self.__class__.__name__))
- super(ItemObject, self).__setattr__('_collector', collector)
- super(ItemObject, self).__setattr__('_hash', hash_)
-
- def __setattr__(self, name, value):
- t, item = self._collector._metrics[self._hash]
- logging.debug('{}:__setattr__(name={}, value={})'.format(
- self.__class__.__name__, name, value))
- setattr(item, name, value)
- self._collector._metrics[self._hash] = (time.time(), item)
-
- def __del__(self):
- """Destroy item object and unlock the collector"""
- logging.debug('{}:__del__()'.format(self.__class__.__name__))
- self._collector.unlock()
-
-
-class Collector(object):
- """Thread-safe collector with aging feature"""
-
- def __init__(self, age_timeout):
- """Initialization"""
- self._metrics = {}
- self._lock = RLock()
- self._age_timeout = age_timeout
- self._start_age_timer()
-
- def _start_age_timer(self):
- """Start age timer"""
- self._age_timer = Timer(self._age_timeout, self._on_timer)
- self._age_timer.start()
-
- def _stop_age_timer(self):
- """Stop age timer"""
- self._age_timer.cancel()
-
- def _on_timer(self):
- """Age timer"""
- self._start_age_timer()
- self._check_aging()
-
- def _check_aging(self):
- """Check aging time for all items"""
- self.lock()
- for data_hash, data in list(self._metrics.items()):
- age, item = data
- if ((time.time() - age) >= self._age_timeout):
- # aging time has expired, remove the item from the collector
- logging.debug('{}:_check_aging():value={}'.format(
- self.__class__.__name__, item))
- self._metrics.pop(data_hash)
- del(item)
- self.unlock()
-
- def lock(self):
- """Lock the collector"""
- logging.debug('{}:lock()'.format(self.__class__.__name__))
- self._lock.acquire()
-
- def unlock(self):
- """Unlock the collector"""
- logging.debug('{}:unlock()'.format(self.__class__.__name__))
- self._lock.release()
-
- def get(self, hash_):
- self.lock()
- if hash_ in self._metrics:
- return ItemObject(self, hash_)
- self.unlock()
- return None
-
- def add(self, item):
- """Add an item into the collector"""
- self.lock()
- logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
- self._metrics[hash(item)] = (time.time(), item)
- self.unlock()
-
- def items(self, select_list=[]):
- """Returns locked (safe) item iterator"""
- metrics = []
- self.lock()
- for k, item in list(self._metrics.items()):
- _, value = item
- for select in select_list:
- if value.match(**select):
- metrics.append(value)
- return ItemIterator(self, metrics)
-
- def destroy(self):
- """Destroy the collector"""
- self._stop_age_timer()
-
-
-class CollectdData(object):
- """Base class for Collectd data"""
-
- def __init__(self, host=None, plugin=None, plugin_instance=None,
- type_=None, type_instance=None, time_=None):
- """Class initialization"""
- self.host = host
- self.plugin = plugin
- self.plugin_instance = plugin_instance
- self.type_instance = type_instance
- self.type = type_
- self.time = time_
-
- @classmethod
- def is_regular_expression(cls, expr):
- return len(expr) > 1 and expr[0] == '/' and expr[-1] == '/'
-
- def match(self, **kargs):
- # compare the metric
- for key, value in list(kargs.items()):
- if self.is_regular_expression(value):
- if re.match(value[1:-1], getattr(self, key)) is None:
- return False
- elif value != getattr(self, key):
- return False
- # return match event if kargs is empty
- return True
-
-
-class CollectdNotification(CollectdData):
- """Collectd notification"""
-
- def __init__(self, host=None, plugin=None, plugin_instance=None,
- type_=None, type_instance=None, severity=None, message=None):
- super(CollectdNotification, self).__init__(
- host, plugin, plugin_instance, type_, type_instance)
- self.severity = severity
- self.message = message
-
- def __repr__(self):
- return '{}(host={}, plugin={}, plugin_instance={}, type={}, ' \
- 'type_instance={}, severity={}, message={}, time={})'.format(
- self.__class__.__name__, self.host, self.plugin,
- self.plugin_instance, self.type, self.type_instance,
- self.severity, self.message, time)
-
-
-class CollectdValue(CollectdData):
- """Collectd value"""
-
- def __init__(self, host=None, plugin=None, plugin_instance=None,
- type_=None, type_instance=None, ds_name='value', value=None,
- interval=None):
- super(CollectdValue, self).__init__(
- host, plugin, plugin_instance, type_, type_instance)
- self.value = value
- self.ds_name = ds_name
- self.interval = interval
-
- @classmethod
- def hash_gen(cls, host, plugin, plugin_instance, type_,
- type_instance, ds_name):
- return hash((host, plugin, plugin_instance, type_,
- type_instance, ds_name))
-
- def __eq__(self, other):
- return hash(self) == hash(other) and self.value == other.value
-
- def __hash__(self):
- return self.hash_gen(self.host, self.plugin, self.plugin_instance,
- self.type, self.type_instance, self.ds_name)
-
- def __repr__(self):
- return '{}(host={}, plugin={}, plugin_instance={}, type={}, ' \
- 'type_instance={}, ds_name={}, value={}, time={})'.format(
- self.__class__.__name__, self.host, self.plugin,
- self.plugin_instance, self.type, self.type_instance,
- self.ds_name, self.value, self.time)
-
-
-class Item(yaml.YAMLObject):
- """Base class to process tags like ArrayItem/ValueItem"""
-
- @classmethod
- def format_node(cls, mapping, metric):
- if mapping.tag in [
- 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
- Number.yaml_tag, StripExtraDash.yaml_tag]:
- return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
- elif mapping.tag == 'tag:yaml.org,2002:map':
- values = []
- for key, value in mapping.value:
- values.append((yaml.ScalarNode(key.tag, key.value),
- cls.format_node(value, metric)))
- return yaml.MappingNode(mapping.tag, values)
- elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
- values = []
- for seq in mapping.value:
- map_values = list()
- for key, value in seq.value:
- if key.value == 'SELECT':
- map_values.append((yaml.ScalarNode(key.tag, key.value),
- cls.format_node(value, metric)))
- else:
- map_values.append((yaml.ScalarNode(key.tag, key.value),
- value))
- values.append(yaml.MappingNode(seq.tag, map_values))
- return yaml.SequenceNode(mapping.tag, values)
- elif mapping.tag in [MapValue.yaml_tag]:
- values = []
- for key, value in mapping.value:
- if key.value == 'VALUE':
- values.append((yaml.ScalarNode(key.tag, key.value),
- cls.format_node(value, metric)))
- else:
- values.append((yaml.ScalarNode(key.tag, key.value), value))
- return yaml.MappingNode(mapping.tag, values)
- return mapping
-
-
-class ValueItem(Item):
- """Class to process VlaueItem tag"""
- yaml_tag = '!ValueItem'
-
- @classmethod
- def from_yaml(cls, loader, node):
- logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
- default, select, value_desc = None, list(), None
- # find value description
- for elem in node.value:
- for key, value in elem.value:
- if key.value == 'VALUE':
- assert value_desc is None, "VALUE key already set"
- value_desc = value
- if key.value == 'SELECT':
- select.append(loader.construct_mapping(value))
- if key.value == 'DEFAULT':
- assert default is None, "DEFAULT key already set"
- default = loader.construct_object(value)
- # if VALUE key isn't given, use default VALUE key
- # format: `VALUE: !Number '{vl.value}'`
- if value_desc is None:
- value_desc = yaml.ScalarNode(tag='!Number', value='{vl.value}')
- # select collectd metric based on SELECT condition
- metrics = loader.collector.items(select)
- assert len(metrics) < 2, \
- 'Wrong SELECT condition {}, selected {} metrics'.format(
- select, len(metrics))
- if len(metrics) > 0:
- item = cls.format_node(value_desc, {'vl': metrics[0],
- 'system': loader.system})
- return loader.construct_object(item)
- # nothing has been found by SELECT condition, set to DEFAULT value.
- assert default is not None, "No metrics selected by SELECT condition" \
- " {} and DEFAULT key isn't set".format(select)
- return default
-
-
-class ArrayItem(Item):
- """Class to process ArrayItem tag"""
- yaml_tag = '!ArrayItem'
-
- @classmethod
- def from_yaml(cls, loader, node):
- logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
- loader, node))
- # e.g.:
- # SequenceNode(tag=u'!ArrayItem', value=[
- # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
- # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
- # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
- # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
- # , ...)
- # ]), ...
- # ), (key, value), ... ])
- # , ... ])
- assert isinstance(node, yaml.SequenceNode), \
- "{} tag isn't YAML array".format(cls.__name__)
- select, index_keys, items, item_desc = list(), list(), list(), None
- for elem in node.value:
- for key, value in elem.value:
- if key.value == 'ITEM-DESC':
- assert item_desc is None, "ITEM-DESC key already set"
- item_desc = value
- if key.value == 'INDEX-KEY':
- assert len(index_keys) == 0, "INDEX-KEY key already set"
- index_keys = loader.construct_sequence(value)
- if key.value == 'SELECT':
- select.append(loader.construct_mapping(value))
- # validate item description
- assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
- assert len(select) > 0 or len(index_keys) > 0, \
- "Mandatory key (INDEX-KEY or SELECT) isn't set"
- metrics = loader.collector.items(select)
- # select metrics based on INDEX-KEY provided
- if len(index_keys) > 0:
- metric_set = set()
- for metric in metrics:
- value = CollectdValue()
- for key in index_keys:
- setattr(value, key, getattr(metric, key))
- metric_set.add(value)
- metrics = list(metric_set)
- # build items based on SELECT and/or INDEX-KEY criteria
- for metric in metrics:
- item = cls.format_node(item_desc,
- {'vl': metric, 'system': loader.system,
- 'config': loader.config})
- items.append(loader.construct_mapping(item))
- return items
-
-
-class Measurements(ArrayItem):
- """Class to process Measurements tag"""
- yaml_tag = '!Measurements'
-
-
-class Events(Item):
- """Class to process Events tag"""
- yaml_tag = '!Events'
-
- @classmethod
- def from_yaml(cls, loader, node):
- condition, item_desc = dict(), None
- for elem in node.value:
- for key, value in elem.value:
- if key.value == 'ITEM-DESC':
- item_desc = value
- if key.value == 'CONDITION':
- condition = loader.construct_mapping(value)
- assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
- if loader.notification.match(**condition):
- item = cls.format_node(item_desc, {
- 'n': loader.notification, 'system': loader.system})
- return loader.construct_mapping(item)
- return None
-
-
-class Bytes2Kibibytes(yaml.YAMLObject):
- """Class to process Bytes2Kibibytes tag"""
- yaml_tag = '!Bytes2Kibibytes'
-
- @classmethod
- def from_yaml(cls, loader, node):
- return round(float(node.value) / 1024.0, 3)
-
-
-class Number(yaml.YAMLObject):
- """Class to process Number tag"""
- yaml_tag = '!Number'
-
- @classmethod
- def from_yaml(cls, loader, node):
- try:
- return int(node.value)
- except ValueError:
- return float(node.value)
-
-
-class StripExtraDash(yaml.YAMLObject):
- """Class to process StripExtraDash tag"""
- yaml_tag = '!StripExtraDash'
-
- @classmethod
- def from_yaml(cls, loader, node):
- return '-'.join([x for x in node.value.split('-') if len(x) > 0])
-
-
-class MapValue(yaml.YAMLObject):
- """Class to process MapValue tag"""
- yaml_tag = '!MapValue'
-
- @classmethod
- def from_yaml(cls, loader, node):
- mapping, val = None, None
- for key, value in node.value:
- if key.value == 'TO':
- mapping = loader.construct_mapping(value)
- if key.value == 'VALUE':
- val = loader.construct_object(value)
- assert mapping is not None, "Mandatory TO key isn't set"
- assert val is not None, "Mandatory VALUE key isn't set"
- assert val in mapping, \
- 'Value "{}" cannot be mapped to any of {} values'.format(
- val, list(mapping.keys()))
- return mapping[val]
-
-
-class Normalizer(object):
- """Normalization class which handles events and measurements"""
-
- def __init__(self):
- """Init"""
- self.interval = None
- self.collector = None
- self.system = None
- self.queue = None
- self.timer = None
-
- @classmethod
- def read_configuration(cls, config_file):
- """read YAML configuration file"""
- # load YAML events/measurements definition
- f = open(config_file, 'r')
- doc_yaml = yaml.compose(f)
- f.close()
- # split events & measurements definitions
- measurements, events = list(), list()
- for key, value in doc_yaml.value:
- if value.tag == Measurements.yaml_tag:
- measurements.append((key, value))
- if value.tag == Events.yaml_tag:
- events.append((key, value))
- measurements_yaml = yaml.MappingNode('tag:yaml.org,2002:map',
- measurements)
- measurements_stream = yaml.serialize(measurements_yaml)
- events_yaml = yaml.MappingNode('tag:yaml.org,2002:map', events)
- events_stream = yaml.serialize(events_yaml)
- # return event & measurements definition
- return events_stream, measurements_stream
-
- def initialize(self, config_file, interval):
- """Initialize the class"""
- e, m = self.read_configuration(config_file)
- self.measurements_stream = m
- self.events_stream = e
- self.system = System()
- self.config = Config(interval)
- self.interval = interval
- # start collector with aging time = double interval
- self.collector = Collector(interval * 2)
- # initialize event thread
- self.queue = queue.Queue()
- self.event_thread = Thread(target=self.event_worker)
- self.event_thread.daemon = True
- self.event_thread.start()
- # initialize measurements timer
- self.start_timer()
-
- def destroy(self):
- """Destroy the class"""
- self.collector.destroy()
- self.post_event(None) # send stop event
- self.event_thread.join()
- self.stop_timer()
-
- def start_timer(self):
- """Start measurements timer"""
- self.timer = Timer(self.interval, self.on_timer)
- self.timer.start()
-
- def stop_timer(self):
- """Stop measurements timer"""
- self.timer.cancel()
-
- def on_timer(self):
- """Measurements timer"""
- self.start_timer()
- self.process_measurements()
-
- def event_worker(self):
- """Event worker"""
- while True:
- event = self.queue.get()
- if isinstance(event, CollectdNotification):
- self.process_notify(event)
- continue
- # exit for the worker
- break
-
- def get_collector(self):
- """Get metric collector reference"""
- return self.collector
-
- def process_measurements(self):
- """Process measurements"""
- loader = yaml.Loader(self.measurements_stream)
- setattr(loader, 'collector', self.collector)
- setattr(loader, 'system', self.system)
- setattr(loader, 'config', self.config)
- measurements = loader.get_data()
- for measurement_name in measurements:
- logging.debug('Process "{}" measurements: {}'.format(
- measurement_name, measurements[measurement_name]))
- for measurement in measurements[measurement_name]:
- self.send_data(measurement)
-
- def process_notify(self, notification):
- """Process events"""
- loader = Loader(self.events_stream)
- setattr(loader, 'notification', notification)
- setattr(loader, 'system', self.system)
- notifications = loader.get_data()
- for notify_name in notifications:
- logging.debug('Process "{}" notification'.format(notify_name))
- if notifications[notify_name] is not None:
- self.send_data(notifications[notify_name])
-
- def send_data(self, data):
- """Send data"""
- assert False, 'send_data() is abstract function and MUST be overridden'
-
- def post_event(self, notification):
- """Post notification into the queue to process"""
- self.queue.put(notification)