From 00f8767cba07c11176748171be32bbdb9a42efcb Mon Sep 17 00:00:00 2001 From: santanude Date: Tue, 2 Nov 2021 17:55:00 +0530 Subject: [PATCH] Add connector between Kafka bus and InfluxdB to sync VES events VES events will be written to Kafka bus by default. However, if an InfluxdB exists in SMO, then the event needs to be synced to InfluxdB also. Issue-Id: SMO-22 Signed-off-by: santanude Change-Id: I152b0ad5a7b6676eef702e3c3811c2f381b0f4f8 Signed-off-by: santanude --- .../evel-test-collector/code/collector/monitor.py | 263 +------------ docker-compose.yaml | 22 +- kafka-connector-influxdb/Dockerfile | 27 ++ kafka-connector-influxdb/Makefile | 5 + .../influxdb-connector/LICENSE.md | 14 + .../influxdb-connector/code/influxdb_connector.py | 413 +++++++++++++++++++++ .../config/influxdb_connector.conf | 4 + kafka-connector-influxdb/start.sh | 57 +++ 8 files changed, 542 insertions(+), 263 deletions(-) create mode 100755 kafka-connector-influxdb/Dockerfile create mode 100755 kafka-connector-influxdb/Makefile create mode 100755 kafka-connector-influxdb/influxdb-connector/LICENSE.md create mode 100644 kafka-connector-influxdb/influxdb-connector/code/influxdb_connector.py create mode 100755 kafka-connector-influxdb/influxdb-connector/config/influxdb_connector.conf create mode 100755 kafka-connector-influxdb/start.sh diff --git a/collector/evel-test-collector/code/collector/monitor.py b/collector/evel-test-collector/code/collector/monitor.py index ae9de29..dc387db 100755 --- a/collector/evel-test-collector/code/collector/monitor.py +++ b/collector/evel-test-collector/code/collector/monitor.py @@ -33,7 +33,6 @@ from base64 import b64decode import json import jsonschema from functools import partial -import requests from datetime import timezone from elasticsearch import Elasticsearch from kafka import KafkaProducer @@ -221,9 +220,6 @@ def listener(environ, start_response, schema): # if word 'elasticsearch' exists in config file then save data in elasticsearch if('elasticsearch' in data_storageArr): save_event_in_elasticsearch(body) - # if word 'influxdb' exists in config file then save data in influxdb - if('influxdb' in data_storageArr): - save_event_in_db(body) except jsonschema.SchemaError as e: logger.error('Schema is not valid! {0}'.format(e)) @@ -253,18 +249,6 @@ def listener(environ, start_response, schema): logger.error('Event invalid for unexpected reason! {0}'.format(e)) -# -------------------------------------------------------------------------- -# Send event to influxdb -# -------------------------------------------------------------------------- - -def send_to_influxdb(event, pdata): - url = 'http://{}/write?db=veseventsdb'.format(influxdb) - logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata)) - r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'}) - logger.info('influxdb return code {}'.format(r.status_code)) - if r.status_code != 204: - logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code)) - # -------------------------------------------------------------------------- # Save event data in Kafka # -------------------------------------------------------------------------- @@ -298,6 +282,7 @@ def produce_events_in_kafka(jobj, topic): # Save event data in Elasticsearch # -------------------------------------------------------------------------- + def format_timestamp(EpochMicrosec): if isinstance(EpochMicrosec, int): return datetime.datetime.fromtimestamp(int(str(EpochMicrosec)[:10]), tz=timezone.utc) @@ -400,252 +385,6 @@ def save_event_in_elasticsearch(body): es.index(index=domain, body=jobj['event']) -def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec): - for additionalMeasurements in val: - pdata = domain + ",eventId={},system={}".format(eventId, source) - nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec) - for key, val in additionalMeasurements.items(): - if isinstance(val, str): - pdata = pdata + ',{}={}'.format(key, process_special_char(val)) - elif isinstance(val, dict): - for key2, val2 in val.items(): - if isinstance(val2, str): - pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) - else: - nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) - else: - nonstringpdata = nonstringpdata + '{}={},'.format(key, val) - send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) - - -def process_nonadditional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec): - for disk in val: - pdata = domain + ",eventId={},system={}".format(eventId, source) - nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec) - for key, val in disk.items(): - if isinstance(val, str): - pdata = pdata + ',{}={}'.format(key, process_special_char(val)) - else: - nonstringpdata = nonstringpdata + '{}={},'.format(key, val) - - send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) - - -def process_pnfRegistration_event(domain, jobj, pdata, nonstringpdata): - pdata = pdata + ",system={}".format(source) - for key, val in jobj.items(): - if key != 'additionalFields' and val != "": - if isinstance(val, str): - pdata = pdata + ',{}={}'.format(key, process_special_char(val)) - else: - nonstringpdata = nonstringpdata + '{}={},'.format(key, val) - elif key == 'additionalFields': - for key2, val2 in val.items(): - if val2 != "" and isinstance(val2, str): - pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) - elif val2 != "": - nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) - - send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) - - -def process_thresholdCrossingAlert_event(domain, jobj, pdata, nonstringpdata): - pdata = pdata + ",system={}".format(source) - for key, val in jobj.items(): - if (key != 'additionalFields' and key != 'additionalParameters' and key != 'associatedAlertIdList') and val != "": - if isinstance(val, str): - pdata = pdata + ',{}={}'.format(key, process_special_char(val)) - else: - nonstringpdata = nonstringpdata + '{}={},'.format(key, val) - elif key == 'additionalFields': - for key2, val2 in val.items(): - if val2 != "" and isinstance(val2, str): - pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) - elif val2 != "": - nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) - elif key == 'additionalParameters': - for addParameter in val: - for key2, val2 in addParameter.items(): - if key2 != "hashMap": - if val2 != "" and isinstance(val2, str): - pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) - elif val2 != "": - nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) - elif key2 == "hashMap": - for key3, val3 in val2.items(): - if val3 != "" and isinstance(val3, str): - pdata = pdata + ',{}={}'.format(key3, process_special_char(val3)) - elif val3 != "": - nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3) - elif key == 'associatedAlertIdList': - associatedAlertIdList = "" - for associatedAlertId in val: - associatedAlertIdList = associatedAlertIdList + associatedAlertId + "|" - if(associatedAlertIdList != ""): - pdata = pdata + ',{}={}'.format("associatedAlertIdList", - process_special_char(associatedAlertIdList)[:-1]) - - send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) - - -def process_fault_event(domain, jobj, pdata, nonstringpdata): - pdata = pdata + ",system={}".format(source) - for key, val in jobj.items(): - if key != 'alarmAdditionalInformation' and val != "": - if isinstance(val, str): - pdata = pdata + ',{}={}'.format(key, process_special_char(val)) - else: - nonstringpdata = nonstringpdata + '{}={},'.format(key, val) - elif key == 'alarmAdditionalInformation': - for key2, val2 in val.items(): - if val2 != "" and isinstance(val2, str): - pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) - elif val2 != "": - nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) - - send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) - - -def process_heartbeat_events(domain, jobj, pdata, nonstringpdata): - pdata = pdata + ",system={}".format(source) - for key, val in jobj.items(): - if key != 'additionalFields' and val != "": - if isinstance(val, str): - pdata = pdata + ',{}={}'.format(key, process_special_char(val)) - else: - nonstringpdata = nonstringpdata + '{}={},'.format(key, val) - elif key == 'additionalFields': - for key2, val2 in val.items(): - if val2 != "" and isinstance(val2, str): - pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) - elif val2 != "": - nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) - - send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) - - -def process_measurement_events(domain, jobj, pdata, nonstringpdata, eventId, startEpochMicrosec, lastEpochMicrosec): - pdata = pdata + ",system={}".format(source) - for key, val in jobj.items(): - if val != "": - if isinstance(val, str): - pdata = pdata + ',{}={}'.format(key, process_special_char(val)) - elif isinstance(val, list): - if key == 'additionalMeasurements': - process_additional_measurements(val, domain + "additionalmeasurements", eventId, startEpochMicrosec, lastEpochMicrosec) - elif key == 'cpuUsageArray': - process_nonadditional_measurements(val, domain + "cpuusage", eventId, startEpochMicrosec, lastEpochMicrosec) - elif key == 'diskUsageArray': - process_nonadditional_measurements(val, domain + "diskusage", eventId, startEpochMicrosec, lastEpochMicrosec) - elif key == 'memoryUsageArray': - process_nonadditional_measurements(val, domain + "memoryusage", eventId, startEpochMicrosec, lastEpochMicrosec) - elif key == 'nicPerformanceArray': - process_nonadditional_measurements(val, domain + "nicperformance", eventId, startEpochMicrosec, lastEpochMicrosec) - elif key == 'loadArray': - process_nonadditional_measurements(val, domain + "load", eventId, startEpochMicrosec, lastEpochMicrosec) - elif key == 'networkSliceArray': - process_nonadditional_measurements(val, domain + "networkslice", eventId, startEpochMicrosec, lastEpochMicrosec) - elif isinstance(val, dict): - for key2, val2 in val.items(): - if isinstance(val2, str): - pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) - else: - nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) - else: - nonstringpdata = nonstringpdata + '{}={},'.format(key, val) - - send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) - - -def process_special_char(str): - for search_char, replace_char in {" ": "\ ", ",": "\,"}.items(): - str = str.replace(search_char, replace_char) - return str - - -def process_time(eventTimestamp): - eventTimestamp = str(eventTimestamp).replace(".", "") - while len(eventTimestamp) < 19: - eventTimestamp = eventTimestamp + "0" - return format(int(eventTimestamp)) - - -# -------------------------------------------------------------------------- -# Save event data -# -------------------------------------------------------------------------- -def save_event_in_db(body): - global source - global eventTimestamp - - jobj = json.loads(body) - source = "unknown" - domain = jobj['event']['commonEventHeader']['domain'] - eventTimestamp = jobj['event']['commonEventHeader']['startEpochMicrosec'] - agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper() - if "LOCALHOST" in agent: - agent = "computehost" - source = jobj['event']['commonEventHeader']['sourceId'].upper() - - # processing common header part - pdata = domain - nonstringpdata = " " - commonHeaderObj = jobj['event']['commonEventHeader'].items() - for key, val in commonHeaderObj: - if val != "": - if (key != 'internalHeaderFields'): - if isinstance(val, str): - pdata = pdata + ',{}={}'.format(key, process_special_char(val)) - else: - nonstringpdata = nonstringpdata + '{}={}'.format(key, val) + ',' - if (key == 'internalHeaderFields'): - for key2, val2 in val.items(): - if val2 != "": - if isinstance(val2, str): - pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) - else: - nonstringpdata = nonstringpdata + '{}={}'.format(key2, val2) + ',' - - # processing pnfRegistration events - if 'pnfRegistrationFields' in jobj['event']: - logger.debug('Found pnfRegistrationFields') - process_pnfRegistration_event(domain, - jobj['event']['pnfRegistrationFields'], - pdata, - nonstringpdata) - - # processing thresholdCrossingAlert events - if 'thresholdCrossingAlertFields' in jobj['event']: - logger.debug('Found thresholdCrossingAlertFields') - process_thresholdCrossingAlert_event(domain, - jobj['event']['thresholdCrossingAlertFields'], - pdata, - nonstringpdata) - - # processing fault events - if 'faultFields' in jobj['event']: - logger.debug('Found faultFields') - process_fault_event(domain, jobj['event']['faultFields'], pdata, nonstringpdata) - - # process heartbeat events - if 'heartbeatFields' in jobj['event']: - logger.debug('Found Heartbeat') - process_heartbeat_events(domain, - jobj['event']['heartbeatFields'], - pdata, - nonstringpdata) - - # processing measurement events - if 'measurementFields' in jobj['event']: - logger.debug('Found measurementFields') - process_measurement_events(domain, - jobj['event']['measurementFields'], - pdata, - nonstringpdata, - jobj['event']['commonEventHeader']['eventId'], - jobj['event']['commonEventHeader']['startEpochMicrosec'], - jobj['event']['commonEventHeader']['lastEpochMicrosec']) - - def test_listener(environ, start_response, schema): ''' Handler for the Test Collector Test Control API. diff --git a/docker-compose.yaml b/docker-compose.yaml index 5ec3fee..764f21c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -29,6 +29,9 @@ services: - 8880:3000 networks: - ves-net + depends_on: + - ves-influxdb + - kafka-connector-influxdb ves-zookeeper: container_name: ves-zookeeper image: confluentinc/cp-zookeeper:5.5.6 @@ -112,7 +115,7 @@ services: ves_influxdb_port: "8086" ves_grafana_host: "ves-grafana" ves_grafana_port: "3000" - data_storage: "influxdb|elasticsearch" + data_storage: "elasticsearch" elasticsearch_domain: "ves-elasticsearch" kafka_host_2: "smo-kafka" kafka_port_2: "29092" @@ -130,6 +133,23 @@ services: - smo-kafka - ves-influxdb - ves-grafana + kafka-connector-influxdb: + container_name: kafka-connector-influxdb + build: ./kafka-connector-influxdb + image: influxdb-connector + networks: + - ves-net + ports: + - 9990:9990 + environment: + ves_influxdb_host: "ves-influxdb" + ves_influxdb_port: "8086" + ves_loglevel: "ERROR" + kafka_host_2: "smo-kafka" + kafka_port_2: "29092" + depends_on: + - smo-kafka + - ves-influxdb ves-agent: container_name: ves-agent build: ./agent diff --git a/kafka-connector-influxdb/Dockerfile b/kafka-connector-influxdb/Dockerfile new file mode 100755 index 0000000..89b3ac7 --- /dev/null +++ b/kafka-connector-influxdb/Dockerfile @@ -0,0 +1,27 @@ +# Copyright 2021 Xoriant Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM ubuntu:focal + +RUN apt-get update && apt-get -y upgrade +RUN apt-get install -y git curl python3 python3-pip +RUN pip3 install requests confluent-kafka + +# Clone influxdb-connector +RUN mkdir -p /opt/ves/influxdb-connector +ADD influxdb-connector /opt/ves/influxdb-connector + +COPY start.sh /opt/ves/start.sh +ENTRYPOINT ["/bin/bash", "/opt/ves/start.sh"] diff --git a/kafka-connector-influxdb/Makefile b/kafka-connector-influxdb/Makefile new file mode 100755 index 0000000..637376f --- /dev/null +++ b/kafka-connector-influxdb/Makefile @@ -0,0 +1,5 @@ +default: all + +all: + docker build -t influxdb-connector . + diff --git a/kafka-connector-influxdb/influxdb-connector/LICENSE.md b/kafka-connector-influxdb/influxdb-connector/LICENSE.md new file mode 100755 index 0000000..9054138 --- /dev/null +++ b/kafka-connector-influxdb/influxdb-connector/LICENSE.md @@ -0,0 +1,14 @@ +# Copyright 2021 Xoriant Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/kafka-connector-influxdb/influxdb-connector/code/influxdb_connector.py b/kafka-connector-influxdb/influxdb-connector/code/influxdb_connector.py new file mode 100644 index 0000000..34df978 --- /dev/null +++ b/kafka-connector-influxdb/influxdb-connector/code/influxdb_connector.py @@ -0,0 +1,413 @@ +# Copyright 2021 Xoriant Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import platform +import json +import logging +from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter +import configparser +import logging.handlers +import requests +from confluent_kafka import Consumer, KafkaError + +# ------------------------------------------------------------------------------ +# Address of influxdb server. +# ------------------------------------------------------------------------------ + +influxdb = '127.0.0.1' + +logger = None + + +def send_to_influxdb(event, pdata): + url = 'http://{}/write?db=veseventsdb'.format(influxdb) + logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata)) + r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'}) + logger.info('influxdb return code {}'.format(r.status_code)) + if r.status_code != 204: + logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code)) + + +def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec): + for additionalMeasurements in val: + pdata = domain + ",eventId={},system={}".format(eventId, source) + nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec) + for key, val in additionalMeasurements.items(): + if isinstance(val, str): + pdata = pdata + ',{}={}'.format(key, process_special_char(val)) + elif isinstance(val, dict): + for key2, val2 in val.items(): + if isinstance(val2, str): + pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) + else: + nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) + else: + nonstringpdata = nonstringpdata + '{}={},'.format(key, val) + send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) + + +def process_nonadditional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec): + for disk in val: + pdata = domain + ",eventId={},system={}".format(eventId, source) + nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec) + for key, val in disk.items(): + if isinstance(val, str): + pdata = pdata + ',{}={}'.format(key, process_special_char(val)) + else: + nonstringpdata = nonstringpdata + '{}={},'.format(key, val) + + send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) + + +def process_pnfRegistration_event(domain, jobj, pdata, nonstringpdata): + pdata = pdata + ",system={}".format(source) + for key, val in jobj.items(): + if key != 'additionalFields' and val != "": + if isinstance(val, str): + pdata = pdata + ',{}={}'.format(key, process_special_char(val)) + else: + nonstringpdata = nonstringpdata + '{}={},'.format(key, val) + elif key == 'additionalFields': + for key2, val2 in val.items(): + if val2 != "" and isinstance(val2, str): + pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) + elif val2 != "": + nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) + + send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) + + +def process_thresholdCrossingAlert_event(domain, jobj, pdata, nonstringpdata): + pdata = pdata + ",system={}".format(source) + for key, val in jobj.items(): + if (key != 'additionalFields' and key != 'additionalParameters' and key != 'associatedAlertIdList') and val != "": + if isinstance(val, str): + pdata = pdata + ',{}={}'.format(key, process_special_char(val)) + else: + nonstringpdata = nonstringpdata + '{}={},'.format(key, val) + elif key == 'additionalFields': + for key2, val2 in val.items(): + if val2 != "" and isinstance(val2, str): + pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) + elif val2 != "": + nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) + elif key == 'additionalParameters': + for addParameter in val: + for key2, val2 in addParameter.items(): + if key2 != "hashMap": + if val2 != "" and isinstance(val2, str): + pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) + elif val2 != "": + nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) + elif key2 == "hashMap": + for key3, val3 in val2.items(): + if val3 != "" and isinstance(val3, str): + pdata = pdata + ',{}={}'.format(key3, process_special_char(val3)) + elif val3 != "": + nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3) + elif key == 'associatedAlertIdList': + associatedAlertIdList = "" + for associatedAlertId in val: + associatedAlertIdList = associatedAlertIdList + associatedAlertId + "|" + if(associatedAlertIdList != ""): + pdata = pdata + ',{}={}'.format("associatedAlertIdList", + process_special_char(associatedAlertIdList)[:-1]) + + send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) + + +def process_fault_event(domain, jobj, pdata, nonstringpdata): + pdata = pdata + ",system={}".format(source) + for key, val in jobj.items(): + if key != 'alarmAdditionalInformation' and val != "": + if isinstance(val, str): + pdata = pdata + ',{}={}'.format(key, process_special_char(val)) + else: + nonstringpdata = nonstringpdata + '{}={},'.format(key, val) + elif key == 'alarmAdditionalInformation': + for key2, val2 in val.items(): + if val2 != "" and isinstance(val2, str): + pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) + elif val2 != "": + nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) + + send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) + + +def process_heartbeat_events(domain, jobj, pdata, nonstringpdata): + pdata = pdata + ",system={}".format(source) + for key, val in jobj.items(): + if key != 'additionalFields' and val != "": + if isinstance(val, str): + pdata = pdata + ',{}={}'.format(key, process_special_char(val)) + else: + nonstringpdata = nonstringpdata + '{}={},'.format(key, val) + elif key == 'additionalFields': + for key2, val2 in val.items(): + if val2 != "" and isinstance(val2, str): + pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) + elif val2 != "": + nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) + + send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) + + +def process_measurement_events(domain, jobj, pdata, nonstringpdata, eventId, startEpochMicrosec, lastEpochMicrosec): + pdata = pdata + ",system={}".format(source) + for key, val in jobj.items(): + if val != "": + if isinstance(val, str): + pdata = pdata + ',{}={}'.format(key, process_special_char(val)) + elif isinstance(val, list): + if key == 'additionalMeasurements': + process_additional_measurements(val, domain + "additionalmeasurements", eventId, startEpochMicrosec, lastEpochMicrosec) + elif key == 'cpuUsageArray': + process_nonadditional_measurements(val, domain + "cpuusage", eventId, startEpochMicrosec, lastEpochMicrosec) + elif key == 'diskUsageArray': + process_nonadditional_measurements(val, domain + "diskusage", eventId, startEpochMicrosec, lastEpochMicrosec) + elif key == 'memoryUsageArray': + process_nonadditional_measurements(val, domain + "memoryusage", eventId, startEpochMicrosec, lastEpochMicrosec) + elif key == 'nicPerformanceArray': + process_nonadditional_measurements(val, domain + "nicperformance", eventId, startEpochMicrosec, lastEpochMicrosec) + elif key == 'loadArray': + process_nonadditional_measurements(val, domain + "load", eventId, startEpochMicrosec, lastEpochMicrosec) + elif key == 'networkSliceArray': + process_nonadditional_measurements(val, domain + "networkslice", eventId, startEpochMicrosec, lastEpochMicrosec) + elif isinstance(val, dict): + for key2, val2 in val.items(): + if isinstance(val2, str): + pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) + else: + nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) + else: + nonstringpdata = nonstringpdata + '{}={},'.format(key, val) + + send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) + + +def process_special_char(str): + for search_char, replace_char in {" ": "\ ", ",": "\,"}.items(): + str = str.replace(search_char, replace_char) + return str + + +def process_time(eventTimestamp): + eventTimestamp = str(eventTimestamp).replace(".", "") + while len(eventTimestamp) < 19: + eventTimestamp = eventTimestamp + "0" + return format(int(eventTimestamp)) + + +def save_event_in_db(body): + jobj = json.loads(body) + global source + global eventTimestamp + source = "unknown" + + domain = jobj['event']['commonEventHeader']['domain'] + eventTimestamp = jobj['event']['commonEventHeader']['startEpochMicrosec'] + agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper() + if "LOCALHOST" in agent: + agent = "computehost" + source = jobj['event']['commonEventHeader']['sourceId'].upper() + + # processing common header part + pdata = domain + + nonstringpdata = " " + commonHeaderObj = jobj['event']['commonEventHeader'].items() + for key, val in commonHeaderObj: + if val != "": + if (key != 'internalHeaderFields'): + if isinstance(val, str): + pdata = pdata + ',{}={}'.format(key, process_special_char(val)) + else: + nonstringpdata = nonstringpdata + '{}={}'.format(key, val) + ',' + if (key == 'internalHeaderFields'): + for key2, val2 in val.items(): + if val2 != "": + if isinstance(val2, str): + pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) + else: + nonstringpdata = nonstringpdata + '{}={}'.format(key2, val2) + ',' + + # processing pnfRegistration events + if 'pnfRegistrationFields' in jobj['event']: + logger.debug('Found pnfRegistrationFields') + process_pnfRegistration_event(domain, + jobj['event']['pnfRegistrationFields'], + pdata, + nonstringpdata) + + # processing thresholdCrossingAlert events + if 'thresholdCrossingAlertFields' in jobj['event']: + logger.debug('Found thresholdCrossingAlertFields') + process_thresholdCrossingAlert_event(domain, + jobj['event']['thresholdCrossingAlertFields'], + pdata, + nonstringpdata) + + # processing fault events + if 'faultFields' in jobj['event']: + logger.debug('Found faultFields') + process_fault_event(domain, jobj['event']['faultFields'], pdata, nonstringpdata) + + # process heartbeat events + if 'heartbeatFields' in jobj['event']: + logger.debug('Found Heartbeat') + process_heartbeat_events(domain, + jobj['event']['heartbeatFields'], + pdata, + nonstringpdata) + + # processing measurement events + if 'measurementFields' in jobj['event']: + logger.debug('Found measurementFields') + process_measurement_events(domain, + jobj['event']['measurementFields'], + pdata, + nonstringpdata, + jobj['event']['commonEventHeader']['eventId'], + jobj['event']['commonEventHeader']['startEpochMicrosec'], + jobj['event']['commonEventHeader']['lastEpochMicrosec']) + + +def main(): + + # ---------------------------------------------------------------------- + # Setup argument parser so we can parse the command-line. + # ---------------------------------------------------------------------- + parser = ArgumentParser(description='', + formatter_class=ArgumentDefaultsHelpFormatter) + parser.add_argument('-i', '--influxdb', + dest='influxdb', + default='localhost', + help='InfluxDB server addresss') + parser.add_argument('-v', '--verbose', + dest='verbose', + action='count', + help='set verbosity level') + parser.add_argument('-c', '--config', + dest='config', + default='/opt/ves/connector/config/consumer.conf', + help='Use this config file.', + metavar='') + parser.add_argument('-s', '--section', + dest='section', + default='default', + metavar='
', + help='section to use in the config file') + + # ---------------------------------------------------------------------- + # Process arguments received. + # ---------------------------------------------------------------------- + args = parser.parse_args() + config_file = args.config + verbose = args.verbose + config_section = args.section + + # ---------------------------------------------------------------------- + # Now read the config file, using command-line supplied values as + # overrides. + # ---------------------------------------------------------------------- + overrides = {} + config = configparser.ConfigParser() + config['defaults'] = {'log_file': 'influxdbConnector.log' + } + config.read(config_file) + + # ---------------------------------------------------------------------- + # extract the values we want. + # ---------------------------------------------------------------------- + + global influxdb + global kafka_server + + influxdb = config.get(config_section, 'influxdb', vars=overrides) + log_file = config.get(config_section, 'log_file', vars=overrides) + kafka_server = config.get(config_section, 'kafka_server', vars=overrides) + + # ---------------------------------------------------------------------- + # Finally we have enough info to start a proper flow trace. + # ---------------------------------------------------------------------- + global logger + logger = logging.getLogger('monitor') + + if ((verbose is not None) and (verbose > 0)): + logger.info('Verbose mode on') + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.DEBUG) + handler = logging.handlers.RotatingFileHandler(log_file, + maxBytes=1000000, + backupCount=10) + if (platform.system() == 'Windows'): + date_format = '%Y-%m-%d %H:%M:%S' + else: + date_format = '%Y-%m-%d %H:%M:%S.%f %z' + formatter = logging.Formatter('%(asctime)s %(name)s - ' + '%(levelname)s - %(message)s', + date_format) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.info('Started') + # ---------------------------------------------------------------------- + # Log the details of the configuration. + # ---------------------------------------------------------------------- + logger.debug('Log file = {0}'.format(log_file)) + logger.debug('Influxdb server = {0}'.format(influxdb)) + logger.debug('kafka server = {0}'.format(kafka_server)) + + # ---------------------------------------------------------------------- + # kafka Consumer code . + # ---------------------------------------------------------------------- + + settings = { + 'bootstrap.servers': kafka_server, + 'group.id': 'mygroup', + 'client.id': 'client-1', + 'enable.auto.commit': True, + 'session.timeout.ms': 6000, + 'default.topic.config': {'auto.offset.reset': 'earliest'} + } + + c = Consumer(settings) + + c.subscribe(['measurement', 'pnfregistration', + 'fault', 'thresholdcrossingalert', 'heartbeat']) + + try: + while True: + msg = c.poll(0.1) + if msg is None: + continue + elif not msg.error(): + logger.debug('Recived message from topic name {} and offset number {}'.format(msg.topic(), msg.offset())) + # saving data in influxdb + save_event_in_db(msg.value()) + elif msg.error().code() == KafkaError._PARTITION_EOF: + logger.error('End of partition reached {0}/{1}' + .format(msg.topic(), msg.partition())) + else: + logger.error('Error occured: {0}'.format(msg.error().str())) + + except KeyboardInterrupt: + pass + + finally: + c.close() + + +if __name__ == '__main__': + main() diff --git a/kafka-connector-influxdb/influxdb-connector/config/influxdb_connector.conf b/kafka-connector-influxdb/influxdb-connector/config/influxdb_connector.conf new file mode 100755 index 0000000..dc0fc5d --- /dev/null +++ b/kafka-connector-influxdb/influxdb-connector/config/influxdb_connector.conf @@ -0,0 +1,4 @@ +[default] +log_file = /opt/ves/influxdbconnector.log +kafka_server = +influxdb = diff --git a/kafka-connector-influxdb/start.sh b/kafka-connector-influxdb/start.sh new file mode 100755 index 0000000..301d785 --- /dev/null +++ b/kafka-connector-influxdb/start.sh @@ -0,0 +1,57 @@ +# Copyright 2021 Xoriant Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +cd /opt/ves +touch monitor.log + +config_file="influxdb-connector/config/influxdb_connector.conf" + +sed -i -- "s/influxdb =/influxdb = $ves_influxdb_host:$ves_influxdb_port/g" \ + $config_file +sed -i -- "s/kafka_server =/kafka_server = $kafka_host_2:$kafka_port_2/g" \ + $config_file + +echo; echo $config_file +cat $config_file + +echo; echo "wait for InfluxDB API at $ves_influxdb_host:$ves_influxdb_port" +STARTTIME=$(date +%s) +max_time=60 +while ! curl http://$ves_influxdb_host:$ves_influxdb_port/ping ; + do + ELAPSED_TIME=$(($(date +%s) - $STARTTIME)) + if [ $ELAPSED_TIME -ge $max_time ]; then + echo "InfluxDB API is not yet up after several attempts! Exiting from script." + exit 1 + fi + echo "InfluxDB API is not yet responding... waiting 10 seconds" + sleep 10 + done + echo "Done." +echo; echo "setup veseventsdb in InfluxDB" +# TODO: check if pre-existing and skip +curl -X POST http://$ves_influxdb_host:$ves_influxdb_port/query \ + --data-urlencode "q=CREATE DATABASE veseventsdb" + +if [ "$ves_loglevel" != "" ]; then + python3 /opt/ves/influxdb-connector/code/influxdb_connector.py \ + --config /opt/ves/influxdb-connector/config/influxdb_connector.conf \ + --influxdb $ves_influxdb_host:$ves_influxdb_port \ + --section default > /opt/ves/monitor.log 2>&1 +else + python3 /opt/ves/influxdb-connector/code/influxdb_connector.py \ + --config /opt/ves/influxdb-connector/config/influxdb_connector.conf \ + --influxdb $ves_influxdb_host:$ves_influxdb_port \ + --section default +fi -- 2.16.6