From 5c7d42a17de1c1b0f30b776970e7fa6bd25097cc Mon Sep 17 00:00:00 2001 From: santanude Date: Tue, 17 May 2022 17:41:25 +0530 Subject: [PATCH] Modify configuration of InfluxDB container to avoid crash SMO-63 Signed-off-by: santanude Change-Id: I71790640f553a5cee20f292c4bdc8316bc47a3c4 --- docker-compose.yaml | 3 +++ .../influxdb-connector/code/influxdb_connector.py | 29 ++++++++++++---------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index aa037cd..1c0edb6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -22,6 +22,9 @@ services: - 8086:8086 networks: - smo-net + environment: + INFLUXDB_DATA_MAX_SERIES_PER_DATABASE: 0 + INFLUXDB_DATA_MAX_VALUES_PER_TAG: 0 smo-grafana: container_name: smo-grafana image: grafana/grafana:7.5.11 diff --git a/influxdb-connector/influxdb-connector/code/influxdb_connector.py b/influxdb-connector/influxdb-connector/code/influxdb_connector.py index 2cb67a8..fc12487 100644 --- a/influxdb-connector/influxdb-connector/code/influxdb_connector.py +++ b/influxdb-connector/influxdb-connector/code/influxdb_connector.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import sys -import os import platform import json import logging @@ -21,7 +19,6 @@ from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter import configparser import logging.handlers import requests -import urllib.request as url from confluent_kafka import Consumer, KafkaError # ------------------------------------------------------------------------------ @@ -32,13 +29,17 @@ influxdb = '127.0.0.1' logger = None + def send_to_influxdb(event, pdata): url = 'http://{}/write?db=eventsdb'.format(influxdb) logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata)) - r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'}) - logger.info('influxdb return code {}'.format(r.status_code)) - assert (r.status_code == 204), logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code)) - + try: + r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'}) + logger.info('influxdb return code {}'.format(r.status_code)) + assert (r.status_code == 204), logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code)) + except Exception as e: + logger.error('Exception occured while saving data : '.format(e)) + def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec): for additionalMeasurements in val: @@ -333,7 +334,6 @@ def main(): } config.read(config_file) - # ---------------------------------------------------------------------- # extract the values we want. # ---------------------------------------------------------------------- @@ -343,7 +343,7 @@ def main(): influxdb = config.get(config_section, 'influxdb', vars=overrides) log_file = config.get(config_section, 'log_file', vars=overrides) - kafka_server = config.get(config_section,'kafka_server', vars=overrides) + kafka_server = config.get(config_section, 'kafka_server', vars=overrides) assert (influxdb != ""), "Value of property 'influxdb' is missing in config file" assert (log_file != ""), "Value of property 'log_file' is missing in config file" @@ -384,7 +384,6 @@ def main(): # kafka Consumer code . # ---------------------------------------------------------------------- - settings = { 'bootstrap.servers': kafka_server, 'group.id': 'mygroup', @@ -396,8 +395,8 @@ def main(): c = Consumer(settings) - c.subscribe(['measurement','pnfregistration', - 'fault','thresholdcrossingalert','heartbeat']) + c.subscribe(['measurement', 'pnfregistration', + 'fault', 'thresholdcrossingalert', 'heartbeat']) try: while True: @@ -408,7 +407,10 @@ def main(): assert (msg.value() is not None), "Invalid message" logger.debug('Recived message from topic name {} and offset number {}'.format(msg.topic(), msg.offset())) # saving data in influxdb - save_event_in_db(msg.value()) + try: + save_event_in_db(msg.value()) + except Exception as e: + logger.error('Exception occured while saving data : '.format(e)) elif msg.error().code() == KafkaError._PARTITION_EOF: logger.error('End of partition reached {0}/{1}' .format(msg.topic(), msg.partition())) @@ -421,5 +423,6 @@ def main(): finally: c.close() + if __name__ == '__main__': main() -- 2.16.6