From: santanude Date: Mon, 27 Jun 2022 13:57:34 +0000 (+0530) Subject: Clean up f-release branch X-Git-Tag: 6.0.2~14^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=c45a39f87868cae21f30b9108a8fce7142c33e7a;p=smo%2Fves.git Clean up f-release branch SMO-69 Signed-off-by: santanude Change-Id: I1206c01a5a0e0554b08b492967f54e8729c558af Signed-off-by: santanude --- diff --git a/Makefile b/Makefile index 51c21d1..7761d7d 100755 --- a/Makefile +++ b/Makefile @@ -17,7 +17,6 @@ default: all all: - cd agent; make cd collector; make cd kafka; make cd dmaapadapter; make diff --git a/README.md b/README.md index 46b15df..57af7c8 100755 --- a/README.md +++ b/README.md @@ -17,14 +17,17 @@ installed on the machine, where you want to run these containers. To build the solution, you need to do the following in the current folder. - % make + % docker-compose build + +or simply by the following make command + + % make ## Run: To run the solution, you need to invoke the following command - % docker-compose up -d smo-collector - % docker-compose up -d agent + % docker-compose up -d or simply by the following make command @@ -32,8 +35,7 @@ or simply by the following make command To stop the solution the following command should be invoked. - % docker-compose down -d smo-collector - % docker-compose down -d agent + % docker-compose down or simply by the following make command diff --git a/agent/Dockerfile b/agent/Dockerfile deleted file mode 100755 index 432f679..0000000 --- a/agent/Dockerfile +++ /dev/null @@ -1,43 +0,0 @@ -# Original work Copyright 2017 AT&T Intellectual Property, Inc -# Modified work Copyright 2021 Xoriant Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# What this is: A Dockerfile for building an OPFNV VES Agent container image. -# -# Status: this is a work in progress, under test. -# - -FROM ubuntu:focal - -RUN mkdir /opt/smo - -RUN apt-get update && apt-get -y upgrade -RUN apt-get install -y tzdata -# Required for kafka: default-jre zookeeperd python-pip kafka-python -# Required for building librdkafka: git build-essential -# libpthread-stubs0-dev libssl-dev libsasl2-dev liblz4-dev -# (or libz-dev?) -# Required for building collectd: pkg-config -RUN apt-get install -y netcat -RUN apt-get install -y default-jre zookeeperd \ -python3 python3-pip pkg-config git build-essential libpthread-stubs0-dev \ -libssl-dev libsasl2-dev liblz4-dev libz-dev -RUN pip3 install kafka-python pyaml -RUN pip3 install --upgrade certifi - -RUN mkdir /opt/smo/barometer -ADD barometer /opt/smo/barometer - -COPY start.sh /opt/smo/start.sh -ENTRYPOINT ["/bin/bash", "/opt/smo/start.sh"] diff --git a/agent/Makefile b/agent/Makefile deleted file mode 100755 index 7238da8..0000000 --- a/agent/Makefile +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright 2021 Xoriant Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -default: all - -all: - docker build -t agent . diff --git a/agent/barometer/3rd_party/collectd-agent-app/LICENSE b/agent/barometer/3rd_party/collectd-agent-app/LICENSE deleted file mode 100755 index adc80a3..0000000 --- a/agent/barometer/3rd_party/collectd-agent-app/LICENSE +++ /dev/null @@ -1,24 +0,0 @@ -Original work Copyright 2016-2017 Intel Corporation -Modified work Copyright 2021 Xoriant Corporation - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies -of the Software, and to permit persons to whom the Software is furnished to do -so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/agent/barometer/3rd_party/collectd-agent-app/agent_app/agent_app.py b/agent/barometer/3rd_party/collectd-agent-app/agent_app/agent_app.py deleted file mode 100755 index cac186c..0000000 --- a/agent/barometer/3rd_party/collectd-agent-app/agent_app/agent_app.py +++ /dev/null @@ -1,225 +0,0 @@ -#!/usr/bin/env python -# -# Copyright(c) 2017-2019 Intel Corporation and OPNFV. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import json -import sys -import base64 -import logging -import argparse -import ssl -from socket import timeout - -try: - import configparser -except ImportError: - import ConfigParser as configparser - -from distutils.util import strtobool -from kafka import KafkaConsumer - -from normalizer import Normalizer -from normalizer import CollectdValue - -try: - # For Python 3.0 and later - import urllib.request as url -except ImportError: - # Fall back to Python 2's urllib2 - import urllib2 as url - - -class VESApp(Normalizer): - """VES Application""" - - def __init__(self): - """Application initialization""" - self._app_config = { - 'Domain': '127.0.0.1', - 'Port': 9999, - 'Path': '', - 'Username': 'user', - 'Password': 'password', - 'Directory_path': 'events', - 'UseHttps': False, - 'SendEventInterval': 10.0, - 'ApiVersion': 5, - 'KafkaPort': 9092, - 'KafkaBroker': 'mykafka' - } - - def send_data(self, event): - """Send event to VES""" - server_url = "http{}://{}:{}{}/eventListener/v{}{}".format( - 's' if self._app_config['UseHttps'] else '', - self._app_config['Domain'], int(self._app_config['Port']), - '{}'.format('/{}'.format(self._app_config['Path']) if len( - self._app_config['Path']) > 0 else ''), - int(self._app_config['ApiVersion']), '{}'.format( - '/{}'.format(self._app_config['Directory_path']) if len( - self._app_config['Directory_path']) > 0 else '')) - logging.info('Vendor Event Listener is at: {}'.format(server_url)) - credentials = base64.b64encode('{}:{}'.format( - self._app_config['Username'], - self._app_config['Password']).encode()).decode() - logging.info('Authentication credentials are: {}'.format(credentials)) - try: - request = url.Request(server_url) - request.add_header('Authorization', 'Basic {}'.format(credentials)) - request.add_header('Content-Type', 'application/json') - event_str = json.dumps(event).encode() - logging.debug("Sending {} to {}".format(event_str, server_url)) - ssl._create_default_https_context = ssl._create_unverified_context - url.urlopen(request, event_str, timeout=1).read().decode('utf-8') - logging.debug("Sent data to {} successfully".format(server_url)) - except (HTTPError, URLError) as e: - logging.error('Vendor Event Listener is is not reachable: {}'.format(e)) - except timeout: - logging.error('socket timed out - URL %s', url) - except Exception as e: - logging.error('Vendor Event Listener error: {}'.format(e)) - else: - logging.info('Access successful.') - - def config(self, config): - """VES option configuration""" - for key, value in config.items('config'): - if key in self._app_config: - try: - if type(self._app_config[key]) == int: - value = int(value) - elif type(self._app_config[key]) == float: - value = float(value) - elif type(self._app_config[key]) == bool: - value = bool(strtobool(value)) - - if isinstance(value, type(self._app_config[key])): - self._app_config[key] = value - else: - logging.error("Type mismatch with %s" % key) - sys.exit() - except ValueError: - logging.error("Incorrect value type for %s" % key) - sys.exit() - else: - logging.error("Incorrect key configuration %s" % key) - sys.exit() - - def init(self, configfile, schema_file): - if configfile is not None: - # read VES configuration file if provided - config = configparser.ConfigParser() - config.optionxform = lambda option: option - config.read(configfile) - self.config(config) - # initialize normalizer - self.initialize(schema_file, self._app_config['SendEventInterval']) - - def run(self): - """Consumer JSON data from kafka broker""" - kafka_server = '{}:{}'.format( - self._app_config.get('KafkaBroker'), - self._app_config.get('KafkaPort')) - consumer = KafkaConsumer( - 'collectd', bootstrap_servers=kafka_server, - auto_offset_reset='latest', enable_auto_commit=False, - value_deserializer=lambda m: json.loads(m.decode('ascii'))) - - for message in consumer: - for kafka_data in message.value: - # { - # u'dstypes': [u'derive'], - # u'plugin': u'cpu', - # u'dsnames': [u'value'], - # u'interval': 10.0, - # u'host': u'localhost', - # u'values': [99.9978996416267], - # u'time': 1502114956.244, - # u'plugin_instance': u'44', - # u'type_instance': u'idle', - # u'type': u'cpu' - # } - logging.debug('{}:run():data={}'.format( - self.__class__.__name__, kafka_data)) - for ds_name in kafka_data['dsnames']: - index = kafka_data['dsnames'].index(ds_name) - val_hash = CollectdValue.hash_gen( - kafka_data['host'], kafka_data['plugin'], - kafka_data['plugin_instance'], kafka_data['type'], - kafka_data['type_instance'], ds_name) - collector = self.get_collector() - val = collector.get(val_hash) - if val: - # update the value - val.value = kafka_data['values'][index] - val.time = kafka_data['time'] - del(val) - else: - # add new value into the collector - val = CollectdValue() - val.host = kafka_data['host'] - val.plugin = kafka_data['plugin'] - val.plugin_instance = kafka_data['plugin_instance'] - val.type = kafka_data['type'] - val.type_instance = kafka_data['type_instance'] - val.value = kafka_data['values'][index] - val.interval = kafka_data['interval'] - val.time = kafka_data['time'] - val.ds_name = ds_name - collector.add(val) - - -def main(): - # Parsing cmdline options - parser = argparse.ArgumentParser() - parser.add_argument("--events-schema", dest="schema", required=True, - help="YAML events schema definition", metavar="FILE") - parser.add_argument("--config", dest="configfile", default=None, - help="Specify config file", metavar="FILE") - parser.add_argument("--loglevel", dest="level", default='INFO', - choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], - help="Specify log level (default: %(default)s)", - metavar="LEVEL") - parser.add_argument("--logfile", dest="logfile", default='agent_app.log', - help="Specify log file (default: %(default)s)", - metavar="FILE") - args = parser.parse_args() - - # Create log file - logging.basicConfig(filename=args.logfile, - format='%(asctime)s %(message)s', - level=args.level) - if args.configfile is None: - logging.warning("No configfile specified, using default options") - - # Create Application Instance - application_instance = VESApp() - application_instance.init(args.configfile, args.schema) - - try: - # Run the plugin - application_instance.run() - except KeyboardInterrupt: - logging.info(" - Ctrl-C handled, exiting gracefully") - except Exception as e: - logging.error('{}, {}'.format(type(e), e)) - finally: - application_instance.destroy() - sys.exit() - - -if __name__ == '__main__': - main() diff --git a/agent/barometer/3rd_party/collectd-agent-app/agent_app/config/agent_app_config.conf b/agent/barometer/3rd_party/collectd-agent-app/agent_app/config/agent_app_config.conf deleted file mode 100755 index 17a4099..0000000 --- a/agent/barometer/3rd_party/collectd-agent-app/agent_app/config/agent_app_config.conf +++ /dev/null @@ -1,12 +0,0 @@ -[config] -Domain = 127.0.0.1 -Path = -Port = 9999 -Directory_path = events -UseHttps = True -Username = user -Password = password -SendEventInterval = 10 -ApiVersion = 7 -KafkaPort = 9092 -KafkaBroker = 127.0.0.1 diff --git a/agent/barometer/3rd_party/collectd-agent-app/agent_app/normalizer.py b/agent/barometer/3rd_party/collectd-agent-app/agent_app/normalizer.py deleted file mode 100755 index a4c5fe9..0000000 --- a/agent/barometer/3rd_party/collectd-agent-app/agent_app/normalizer.py +++ /dev/null @@ -1,606 +0,0 @@ -# -# Copyright(c) 2017 Intel Corporation. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Authors: -# Volodymyr Mytnyk -# - -import yaml -import logging -import datetime -import time -from threading import RLock -from threading import Timer -from threading import Thread -import re - -# import YAML loader -try: - from yaml import CLoader as Loader -except ImportError: - from yaml import Loader - -# import synchronized queue -try: - import queue -except ImportError: - import Queue as queue - - -class Config(object): - """Configuration class used to pass config option into YAML file""" - - def __init__(self, interval): - self.interval = interval - - -class System(object): - """System class which provides information like host, time etc., into YAML - file""" - - def __init__(self): - self.hostname = 'localhost' - self._id = 0 - - @property - def id(self): - self._id = self._id + 1 - return self._id - - @property - def time(self): - return time.time() - - @property - def date(self): - return datetime.date.today().isoformat() - - -class ItemIterator(object): - """Item iterator returned by Collector class""" - - def __init__(self, collector, items): - """Item iterator init""" - logging.debug('{}:__init__()'.format(self.__class__.__name__)) - self._items = items - self._collector = collector - self._index = 0 - - def __next__(self): - """Returns next item from the list""" - if self._index == len(self._items): - raise StopIteration - curr_index = self._index - self._index = curr_index + 1 - return self.items[curr_index] - - def __getitem__(self, key): - """get item by index""" - return self._items[key] - - def __len__(self): - """Return length of elements""" - return len(self._items) - - def __del__(self): - """Destroy iterator and unlock the collector""" - logging.debug('{}:__del__()'.format(self.__class__.__name__)) - self._collector.unlock() - - -class ItemObject(object): - """Item object returned by Collector class""" - - def __init__(self, collector, hash_): - """Item object init""" - logging.debug('{}:__init__()'.format(self.__class__.__name__)) - super(ItemObject, self).__setattr__('_collector', collector) - super(ItemObject, self).__setattr__('_hash', hash_) - - def __setattr__(self, name, value): - t, item = self._collector._metrics[self._hash] - logging.debug('{}:__setattr__(name={}, value={})'.format( - self.__class__.__name__, name, value)) - setattr(item, name, value) - self._collector._metrics[self._hash] = (time.time(), item) - - def __del__(self): - """Destroy item object and unlock the collector""" - logging.debug('{}:__del__()'.format(self.__class__.__name__)) - self._collector.unlock() - - -class Collector(object): - """Thread-safe collector with aging feature""" - - def __init__(self, age_timeout): - """Initialization""" - self._metrics = {} - self._lock = RLock() - self._age_timeout = age_timeout - self._start_age_timer() - - def _start_age_timer(self): - """Start age timer""" - self._age_timer = Timer(self._age_timeout, self._on_timer) - self._age_timer.start() - - def _stop_age_timer(self): - """Stop age timer""" - self._age_timer.cancel() - - def _on_timer(self): - """Age timer""" - self._start_age_timer() - self._check_aging() - - def _check_aging(self): - """Check aging time for all items""" - self.lock() - for data_hash, data in list(self._metrics.items()): - age, item = data - if ((time.time() - age) >= self._age_timeout): - # aging time has expired, remove the item from the collector - logging.debug('{}:_check_aging():value={}'.format( - self.__class__.__name__, item)) - self._metrics.pop(data_hash) - del(item) - self.unlock() - - def lock(self): - """Lock the collector""" - logging.debug('{}:lock()'.format(self.__class__.__name__)) - self._lock.acquire() - - def unlock(self): - """Unlock the collector""" - logging.debug('{}:unlock()'.format(self.__class__.__name__)) - self._lock.release() - - def get(self, hash_): - self.lock() - if hash_ in self._metrics: - return ItemObject(self, hash_) - self.unlock() - return None - - def add(self, item): - """Add an item into the collector""" - self.lock() - logging.debug('{}:add(item={})'.format(self.__class__.__name__, item)) - self._metrics[hash(item)] = (time.time(), item) - self.unlock() - - def items(self, select_list=[]): - """Returns locked (safe) item iterator""" - metrics = [] - self.lock() - for k, item in list(self._metrics.items()): - _, value = item - for select in select_list: - if value.match(**select): - metrics.append(value) - return ItemIterator(self, metrics) - - def destroy(self): - """Destroy the collector""" - self._stop_age_timer() - - -class CollectdData(object): - """Base class for Collectd data""" - - def __init__(self, host=None, plugin=None, plugin_instance=None, - type_=None, type_instance=None, time_=None): - """Class initialization""" - self.host = host - self.plugin = plugin - self.plugin_instance = plugin_instance - self.type_instance = type_instance - self.type = type_ - self.time = time_ - - @classmethod - def is_regular_expression(cls, expr): - return len(expr) > 1 and expr[0] == '/' and expr[-1] == '/' - - def match(self, **kargs): - # compare the metric - for key, value in list(kargs.items()): - if self.is_regular_expression(value): - if re.match(value[1:-1], getattr(self, key)) is None: - return False - elif value != getattr(self, key): - return False - # return match event if kargs is empty - return True - - -class CollectdNotification(CollectdData): - """Collectd notification""" - - def __init__(self, host=None, plugin=None, plugin_instance=None, - type_=None, type_instance=None, severity=None, message=None): - super(CollectdNotification, self).__init__( - host, plugin, plugin_instance, type_, type_instance) - self.severity = severity - self.message = message - - def __repr__(self): - return '{}(host={}, plugin={}, plugin_instance={}, type={}, ' \ - 'type_instance={}, severity={}, message={}, time={})'.format( - self.__class__.__name__, self.host, self.plugin, - self.plugin_instance, self.type, self.type_instance, - self.severity, self.message, time) - - -class CollectdValue(CollectdData): - """Collectd value""" - - def __init__(self, host=None, plugin=None, plugin_instance=None, - type_=None, type_instance=None, ds_name='value', value=None, - interval=None): - super(CollectdValue, self).__init__( - host, plugin, plugin_instance, type_, type_instance) - self.value = value - self.ds_name = ds_name - self.interval = interval - - @classmethod - def hash_gen(cls, host, plugin, plugin_instance, type_, - type_instance, ds_name): - return hash((host, plugin, plugin_instance, type_, - type_instance, ds_name)) - - def __eq__(self, other): - return hash(self) == hash(other) and self.value == other.value - - def __hash__(self): - return self.hash_gen(self.host, self.plugin, self.plugin_instance, - self.type, self.type_instance, self.ds_name) - - def __repr__(self): - return '{}(host={}, plugin={}, plugin_instance={}, type={}, ' \ - 'type_instance={}, ds_name={}, value={}, time={})'.format( - self.__class__.__name__, self.host, self.plugin, - self.plugin_instance, self.type, self.type_instance, - self.ds_name, self.value, self.time) - - -class Item(yaml.YAMLObject): - """Base class to process tags like ArrayItem/ValueItem""" - - @classmethod - def format_node(cls, mapping, metric): - if mapping.tag in [ - 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag, - Number.yaml_tag, StripExtraDash.yaml_tag]: - return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric)) - elif mapping.tag == 'tag:yaml.org,2002:map': - values = [] - for key, value in mapping.value: - values.append((yaml.ScalarNode(key.tag, key.value), - cls.format_node(value, metric))) - return yaml.MappingNode(mapping.tag, values) - elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]: - values = [] - for seq in mapping.value: - map_values = list() - for key, value in seq.value: - if key.value == 'SELECT': - map_values.append((yaml.ScalarNode(key.tag, key.value), - cls.format_node(value, metric))) - else: - map_values.append((yaml.ScalarNode(key.tag, key.value), - value)) - values.append(yaml.MappingNode(seq.tag, map_values)) - return yaml.SequenceNode(mapping.tag, values) - elif mapping.tag in [MapValue.yaml_tag]: - values = [] - for key, value in mapping.value: - if key.value == 'VALUE': - values.append((yaml.ScalarNode(key.tag, key.value), - cls.format_node(value, metric))) - else: - values.append((yaml.ScalarNode(key.tag, key.value), value)) - return yaml.MappingNode(mapping.tag, values) - return mapping - - -class ValueItem(Item): - """Class to process VlaueItem tag""" - yaml_tag = '!ValueItem' - - @classmethod - def from_yaml(cls, loader, node): - logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader)) - default, select, value_desc = None, list(), None - # find value description - for elem in node.value: - for key, value in elem.value: - if key.value == 'VALUE': - assert value_desc is None, "VALUE key already set" - value_desc = value - if key.value == 'SELECT': - select.append(loader.construct_mapping(value)) - if key.value == 'DEFAULT': - assert default is None, "DEFAULT key already set" - default = loader.construct_object(value) - # if VALUE key isn't given, use default VALUE key - # format: `VALUE: !Number '{vl.value}'` - if value_desc is None: - value_desc = yaml.ScalarNode(tag='!Number', value='{vl.value}') - # select collectd metric based on SELECT condition - metrics = loader.collector.items(select) - assert len(metrics) < 2, \ - 'Wrong SELECT condition {}, selected {} metrics'.format( - select, len(metrics)) - if len(metrics) > 0: - item = cls.format_node(value_desc, {'vl': metrics[0], - 'system': loader.system}) - return loader.construct_object(item) - # nothing has been found by SELECT condition, set to DEFAULT value. - assert default is not None, "No metrics selected by SELECT condition" \ - " {} and DEFAULT key isn't set".format(select) - return default - - -class ArrayItem(Item): - """Class to process ArrayItem tag""" - yaml_tag = '!ArrayItem' - - @classmethod - def from_yaml(cls, loader, node): - logging.debug('{}:process(loader={}, node={})'.format(cls.__name__, - loader, node)) - # e.g.: - # SequenceNode(tag=u'!ArrayItem', value=[ - # MappingNode(tag=u'tag:yaml.org,2002:map', value=[ - # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'), - # MappingNode(tag=u'tag:yaml.org,2002:map', value=[ - # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'), - # , ...) - # ]), ... - # ), (key, value), ... ]) - # , ... ]) - assert isinstance(node, yaml.SequenceNode), \ - "{} tag isn't YAML array".format(cls.__name__) - select, index_keys, items, item_desc = list(), list(), list(), None - for elem in node.value: - for key, value in elem.value: - if key.value == 'ITEM-DESC': - assert item_desc is None, "ITEM-DESC key already set" - item_desc = value - if key.value == 'INDEX-KEY': - assert len(index_keys) == 0, "INDEX-KEY key already set" - index_keys = loader.construct_sequence(value) - if key.value == 'SELECT': - select.append(loader.construct_mapping(value)) - # validate item description - assert item_desc is not None, "Mandatory ITEM-DESC key isn't set" - assert len(select) > 0 or len(index_keys) > 0, \ - "Mandatory key (INDEX-KEY or SELECT) isn't set" - metrics = loader.collector.items(select) - # select metrics based on INDEX-KEY provided - if len(index_keys) > 0: - metric_set = set() - for metric in metrics: - value = CollectdValue() - for key in index_keys: - setattr(value, key, getattr(metric, key)) - metric_set.add(value) - metrics = list(metric_set) - # build items based on SELECT and/or INDEX-KEY criteria - for metric in metrics: - item = cls.format_node(item_desc, - {'vl': metric, 'system': loader.system, - 'config': loader.config}) - items.append(loader.construct_mapping(item)) - return items - - -class Measurements(ArrayItem): - """Class to process Measurements tag""" - yaml_tag = '!Measurements' - - -class Events(Item): - """Class to process Events tag""" - yaml_tag = '!Events' - - @classmethod - def from_yaml(cls, loader, node): - condition, item_desc = dict(), None - for elem in node.value: - for key, value in elem.value: - if key.value == 'ITEM-DESC': - item_desc = value - if key.value == 'CONDITION': - condition = loader.construct_mapping(value) - assert item_desc is not None, "Mandatory ITEM-DESC key isn't set" - if loader.notification.match(**condition): - item = cls.format_node(item_desc, { - 'n': loader.notification, 'system': loader.system}) - return loader.construct_mapping(item) - return None - - -class Bytes2Kibibytes(yaml.YAMLObject): - """Class to process Bytes2Kibibytes tag""" - yaml_tag = '!Bytes2Kibibytes' - - @classmethod - def from_yaml(cls, loader, node): - return round(float(node.value) / 1024.0, 3) - - -class Number(yaml.YAMLObject): - """Class to process Number tag""" - yaml_tag = '!Number' - - @classmethod - def from_yaml(cls, loader, node): - try: - return int(node.value) - except ValueError: - return float(node.value) - - -class StripExtraDash(yaml.YAMLObject): - """Class to process StripExtraDash tag""" - yaml_tag = '!StripExtraDash' - - @classmethod - def from_yaml(cls, loader, node): - return '-'.join([x for x in node.value.split('-') if len(x) > 0]) - - -class MapValue(yaml.YAMLObject): - """Class to process MapValue tag""" - yaml_tag = '!MapValue' - - @classmethod - def from_yaml(cls, loader, node): - mapping, val = None, None - for key, value in node.value: - if key.value == 'TO': - mapping = loader.construct_mapping(value) - if key.value == 'VALUE': - val = loader.construct_object(value) - assert mapping is not None, "Mandatory TO key isn't set" - assert val is not None, "Mandatory VALUE key isn't set" - assert val in mapping, \ - 'Value "{}" cannot be mapped to any of {} values'.format( - val, list(mapping.keys())) - return mapping[val] - - -class Normalizer(object): - """Normalization class which handles events and measurements""" - - def __init__(self): - """Init""" - self.interval = None - self.collector = None - self.system = None - self.queue = None - self.timer = None - - @classmethod - def read_configuration(cls, config_file): - """read YAML configuration file""" - # load YAML events/measurements definition - f = open(config_file, 'r') - doc_yaml = yaml.compose(f) - f.close() - # split events & measurements definitions - measurements, events = list(), list() - for key, value in doc_yaml.value: - if value.tag == Measurements.yaml_tag: - measurements.append((key, value)) - if value.tag == Events.yaml_tag: - events.append((key, value)) - measurements_yaml = yaml.MappingNode('tag:yaml.org,2002:map', - measurements) - measurements_stream = yaml.serialize(measurements_yaml) - events_yaml = yaml.MappingNode('tag:yaml.org,2002:map', events) - events_stream = yaml.serialize(events_yaml) - # return event & measurements definition - return events_stream, measurements_stream - - def initialize(self, config_file, interval): - """Initialize the class""" - e, m = self.read_configuration(config_file) - self.measurements_stream = m - self.events_stream = e - self.system = System() - self.config = Config(interval) - self.interval = interval - # start collector with aging time = double interval - self.collector = Collector(interval * 2) - # initialize event thread - self.queue = queue.Queue() - self.event_thread = Thread(target=self.event_worker) - self.event_thread.daemon = True - self.event_thread.start() - # initialize measurements timer - self.start_timer() - - def destroy(self): - """Destroy the class""" - self.collector.destroy() - self.post_event(None) # send stop event - self.event_thread.join() - self.stop_timer() - - def start_timer(self): - """Start measurements timer""" - self.timer = Timer(self.interval, self.on_timer) - self.timer.start() - - def stop_timer(self): - """Stop measurements timer""" - self.timer.cancel() - - def on_timer(self): - """Measurements timer""" - self.start_timer() - self.process_measurements() - - def event_worker(self): - """Event worker""" - while True: - event = self.queue.get() - if isinstance(event, CollectdNotification): - self.process_notify(event) - continue - # exit for the worker - break - - def get_collector(self): - """Get metric collector reference""" - return self.collector - - def process_measurements(self): - """Process measurements""" - loader = yaml.Loader(self.measurements_stream) - setattr(loader, 'collector', self.collector) - setattr(loader, 'system', self.system) - setattr(loader, 'config', self.config) - measurements = loader.get_data() - for measurement_name in measurements: - logging.debug('Process "{}" measurements: {}'.format( - measurement_name, measurements[measurement_name])) - for measurement in measurements[measurement_name]: - self.send_data(measurement) - - def process_notify(self, notification): - """Process events""" - loader = Loader(self.events_stream) - setattr(loader, 'notification', notification) - setattr(loader, 'system', self.system) - notifications = loader.get_data() - for notify_name in notifications: - logging.debug('Process "{}" notification'.format(notify_name)) - if notifications[notify_name] is not None: - self.send_data(notifications[notify_name]) - - def send_data(self, data): - """Send data""" - assert False, 'send_data() is abstract function and MUST be overridden' - - def post_event(self, notification): - """Post notification into the queue to process""" - self.queue.put(notification) diff --git a/agent/barometer/3rd_party/collectd-agent-app/agent_app/yaml/host.yaml b/agent/barometer/3rd_party/collectd-agent-app/agent_app/yaml/host.yaml deleted file mode 100755 index 3d719da..0000000 --- a/agent/barometer/3rd_party/collectd-agent-app/agent_app/yaml/host.yaml +++ /dev/null @@ -1,288 +0,0 @@ ---- -# Common event header definition (required fields and defaults) -commonEventHeader: &commonEventHeader - domain: "measurement" - eventId: "{system.id}" - eventName: "" - eventType: Info - lastEpochMicrosec: 0 - priority: Normal - reportingEntityId: &reportingEntityId "{system.hostname}" - reportingEntityName: *reportingEntityId - sequence: 0 - sourceName: N/A - startEpochMicrosec: 0 - version: "4.0" - vesEventListenerVersion: "7.2.1" - -# Host measurements definition -Host Measurements: !Measurements - - ITEM-DESC: - event: - commonEventHeader: &hostCommonEventHeader - <<: *commonEventHeader - eventType: platform - domain: measurement - sourceId: &sourceId "{vl.host}" - sourceName: *sourceId - startEpochMicrosec: !Number "{vl.time}" - measurementFields: &hostMeasurementFields - measurementFieldsVersion: "4.0" - measurementInterval: !Number "{vl.interval}" - loadArray: !ArrayItem - - SELECT: - host: "{vl.host}" - plugin: load - type: load - ds_name: midterm - - ITEM-DESC: - midTerm : !Number "{vl.value}" - shortTerm : !ValueItem - - SELECT: - host: "{vl.host}" - plugin: load - type: load - ds_name: shortterm - longTerm : !ValueItem - - SELECT: - host: "{vl.host}" - plugin: load - type: load - ds_name: longterm - memoryUsageArray: !ArrayItem - - SELECT: - host: "{vl.host}" - plugin: memory - type: memory - type_instance: free - - ITEM-DESC: - vmIdentifier: "{vl.host}" - memoryFree: !Number "{vl.value}" - memoryUsed: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: memory - type: memory - type_instance: used - - VALUE: !Bytes2Kibibytes "{vl.value}" - memoryBuffered: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: memory - type: memory - type_instance: buffered - - VALUE: !Bytes2Kibibytes "{vl.value}" - memoryCached: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: memory - type: memory - type_instance: cached - - VALUE: !Bytes2Kibibytes "{vl.value}" - memorySlabRecl: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: memory - type: memory - type_instance: slab_recl - - VALUE: !Bytes2Kibibytes "{vl.value}" - - DEFAULT: 0 - memorySlabUnrecl: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: memory - type: memory - type_instance: slab_unrecl - - VALUE: !Bytes2Kibibytes "{vl.value}" - - DEFAULT: 0 - cpuUsageArray: !ArrayItem - - SELECT: - host: "{vl.host}" - plugin: cpu - type: percent - type_instance: idle - - ITEM-DESC: - cpuIdentifier: "{vl.plugin_instance}" - cpuIdle: !Number "{vl.value}" - percentUsage: 0.0 - cpuUsageUser: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: cpu - plugin_instance: "{vl.plugin_instance}" - type: percent - type_instance: user - cpuWait: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: cpu - plugin_instance: "{vl.plugin_instance}" - type: percent - type_instance: wait - cpuUsageInterrupt: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: cpu - plugin_instance: "{vl.plugin_instance}" - type: percent - type_instance: interrupt - cpuUsageNice: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: cpu - plugin_instance: "{vl.plugin_instance}" - type: percent - type_instance: nice - cpuUsageSoftIrq: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: cpu - plugin_instance: "{vl.plugin_instance}" - type: percent - type_instance: softirq - cpuUsageSteal: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: cpu - plugin_instance: "{vl.plugin_instance}" - type: percent - type_instance: steal - cpuUsageSystem: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: cpu - plugin_instance: "{vl.plugin_instance}" - type: percent - type_instance: system - nicPerformanceArray: !ArrayItem - - SELECT: - host: "{vl.host}" - plugin: interface - type: if_packets - ds_name: rx - - ITEM-DESC: - valuesAreSuspect: "true" - nicIdentifier: "{vl.plugin_instance}" - receivedTotalPacketsAccumulated: !Number "{vl.value}" - transmittedTotalPacketsAccumulated: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: interface - plugin_instance: "{vl.plugin_instance}" - type: if_packets - ds_name: tx - receivedOctetsAccumulated: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: interface - plugin_instance: "{vl.plugin_instance}" - type: if_octets - ds_name: rx - transmittedOctetsAccumulated: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: interface - plugin_instance: "{vl.plugin_instance}" - type: if_octets - ds_name: tx - receivedErrorPacketsAccumulated: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: interface - plugin_instance: "{vl.plugin_instance}" - type: if_errors - ds_name: rx - transmittedErrorPacketsAccumulated: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: interface - plugin_instance: "{vl.plugin_instance}" - type: if_errors - ds_name: tx - receivedDiscardedPacketsAccumulated: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: interface - plugin_instance: "{vl.plugin_instance}" - type: if_dropped - ds_name: rx - transmittedDiscardedPacketsAccumulated: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: interface - plugin_instance: "{vl.plugin_instance}" - type: if_dropped - ds_name: tx - diskUsageArray: !ArrayItem - - SELECT: - host: "{vl.host}" - plugin: disk - type: disk_octets - ds_name: read - - ITEM-DESC: - diskIdentifier: "{vl.plugin_instance}" - diskOctetsReadLast: !Number "{vl.value}" - diskOctetsWriteLast: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: disk - plugin_instance: "{vl.plugin_instance}" - type: disk_octets - ds_name: write - diskOpsReadLast: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: disk - plugin_instance: "{vl.plugin_instance}" - type: disk_ops - ds_name: read - diskOpsWriteLast: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: disk - plugin_instance: "{vl.plugin_instance}" - type: disk_ops - ds_name: write - diskIoTimeLast: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: disk - plugin_instance: "{vl.plugin_instance}" - type: disk_io_time - ds_name: io_time - - DEFAULT: 0 - diskMergedReadLast: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: disk - plugin_instance: "{vl.plugin_instance}" - type: disk_merged - ds_name: read - - DEFAULT: 0 - diskMergedWriteLast: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: disk - plugin_instance: "{vl.plugin_instance}" - type: disk_merged - ds_name: write - - DEFAULT: 0 - diskTimeReadLast: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: disk - plugin_instance: "{vl.plugin_instance}" - type: disk_time - ds_name: read - - DEFAULT: 0 - diskTimeWriteLast: !ValueItem - - SELECT: - host: "{vl.host}" - plugin: disk - plugin_instance: "{vl.plugin_instance}" - type: disk_time - ds_name: write - - DEFAULT: 0 - - SELECT: - plugin: memory - type_instance: free diff --git a/agent/barometer/LICENSE b/agent/barometer/LICENSE deleted file mode 100755 index 88cb3c7..0000000 --- a/agent/barometer/LICENSE +++ /dev/null @@ -1,14 +0,0 @@ -Original work Copyright 2015 Open Platform for NFV Project, Inc. and its contributors -Modified work Copyright 2021 Xoriant Corporation - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/agent/start.sh b/agent/start.sh deleted file mode 100755 index a65c6d7..0000000 --- a/agent/start.sh +++ /dev/null @@ -1,74 +0,0 @@ -#!/bin/bash -# Original work Copyright 2017 AT&T Intellectual Property, Inc -# Modified work Copyright 2021 Xoriant Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -#. What this is: Startup script for the OPNFV Agent running under docker. - -echo "Agent is trying to connect Kafka Broker.." -timeout 1m bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$agent_kafka_host/$agent_kafka_port; do sleep 2; done' -success=$? -if [ $success -eq 0 ] - then - echo "Kafka is up.." - else - echo "No Kafka found .. exiting container.." - exit; -fi - -echo "Agent is trying to connect smo-collector.." -timeout 1m bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$smo_collector_host/$smo_collector_port; do sleep 2; done' -success=$? -if [ $success -eq 0 ] - then - echo "smo-collector is up.." - else - echo "No smo-collector found .. exiting container.." - exit; -fi - -echo "$agent_kafka_host $agent_kafka_host" >>/etc/hosts -echo "agent_kafka_host =$agent_kafka_host" -echo "*** /etc/hosts ***" -cat /etc/hosts - -cd /opt/smo/barometer/3rd_party/collectd-agent-app/agent_app -cat <agent_app_config.conf -[config] -Domain = $smo_collector_host -Port = $smo_collector_port -Path = $smo_collector_path -Directory_path = $smo_collector_directory_path -UseHttps = $smo_collector_https -Username = $smo_collector_user -Password = $smo_collector_pass -SendEventInterval = $agent_interval -ApiVersion = $smo_collector_version -KafkaPort = $agent_kafka_port -KafkaBroker = $agent_kafka_host -EOF - -cat agent_app_config.conf -echo "agent_mode=$agent_mode" - -if [[ "$loglevel" == "" ]]; then - loglevel=ERROR -fi - -python3 agent_app.py --events-schema=$agent_mode.yaml --loglevel $loglevel \ - --config=agent_app_config.conf - -# Dump agent_app.log if the command above exits (fails) -echo "*** agent_app.log ***" -cat agent_app.log diff --git a/docker-compose.yaml b/docker-compose.yaml index 1c0edb6..312b839 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -35,30 +35,6 @@ services: depends_on: - smo-influxdb - smo-influxdb-connector - agent-zookeeper: - container_name: agent-zookeeper - image: confluentinc/cp-zookeeper:5.5.6 - networks: - - agent-net - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - agent-kafka: - container_name: agent-kafka - image: confluentinc/cp-kafka:5.5.6 - networks: - - agent-net - depends_on: - - agent-zookeeper - ports: - - 9092:9092 - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: agent-zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://agent-kafka:9092,PLAINTEXT_HOST://localhost:19092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 smo-zookeeper: container_name: smo-zookeeper image: confluentinc/cp-zookeeper:5.5.6 @@ -83,17 +59,6 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - agent-kafdrop: - container_name: agent-kafdrop - image: obsidiandynamics/kafdrop:3.27.0 - networks: - - agent-net - depends_on: - - agent-kafka - ports: - - 9000:9000 - environment: - KAFKA_BROKERCONNECT: agent-kafka:9092 smo-kafdrop: container_name: smo-kafdrop image: obsidiandynamics/kafdrop:3.27.0 @@ -107,8 +72,7 @@ services: KAFKA_BROKERCONNECT: smo-kafka:29092 smo-dmaap-adapter: container_name: smo-dmaap-adapter - build: ./dmaapadapter - image: smo-dmaap-adapter + image: nexus3.o-ran-sc.org:10004/smo-dmaap-adapter:5.0.1 networks: - smo-net ports: @@ -120,10 +84,8 @@ services: enable_assert: "False" smo-collector: container_name: smo-collector - build: ./collector - image: smo-collector + image: nexus3.o-ran-sc.org:10004/smo-collector:5.0.1 networks: - - agent-net - smo-net ports: - 9999:9999 @@ -144,8 +106,7 @@ services: - smo-kafka smo-influxdb-connector: container_name: smo-influxdb-connector - build: ./influxdb-connector - image: smo-influxdb-connector + image: nexus3.o-ran-sc.org:10004/smo-influxdb-connector:5.0.1 networks: - smo-net ports: @@ -160,34 +121,9 @@ services: depends_on: - smo-kafka - smo-influxdb - agent: - container_name: agent - build: ./agent - image: agent - networks: - - agent-net - restart: always - environment: - smo_collector_host: "smo-collector" - smo_collector_port: "9999" - smo_collector_path: "" - smo_collector_directory_path: "events" - smo_collector_https: "True" - smo_collector_user: "user" - smo_collector_pass: "password" - smo_collector_version: "5" - agent_interval: "10" - agent_kafka_port: "9092" - agent_kafka_host: "agent-kafka" - agent_mode: "./yaml/host" - loglevel: "ERROR" - depends_on: - - agent-kafka - - smo-collector smo-post-config: container_name: smo-post-config - build: ./postconfig - image: smo-post-config + image: nexus3.o-ran-sc.org:10004/smo-post-config:5.0.1 environment: smo_influxdb_host: "smo-influxdb" smo_influxdb_port: "8086" @@ -200,9 +136,6 @@ services: - smo-net networks: - agent-net: - driver: bridge - name: agent-net smo-net: driver: bridge name: smo-net