Modify configuration of InfluxDB container to avoid crash 22/8322/3
authorsantanude <santanu.de@xoriant.com>
Tue, 17 May 2022 12:11:25 +0000 (17:41 +0530)
committersantanu de <santanu.de@xoriant.com>
Thu, 19 May 2022 09:50:58 +0000 (09:50 +0000)
SMO-63

Signed-off-by: santanude <santanu.de@xoriant.com>
Change-Id: I71790640f553a5cee20f292c4bdc8316bc47a3c4

docker-compose.yaml
influxdb-connector/influxdb-connector/code/influxdb_connector.py

index aa037cd..1c0edb6 100644 (file)
@@ -22,6 +22,9 @@ services:
              - 8086:8086
         networks:
              - smo-net
+        environment:
+             INFLUXDB_DATA_MAX_SERIES_PER_DATABASE: 0
+             INFLUXDB_DATA_MAX_VALUES_PER_TAG: 0
   smo-grafana:
         container_name: smo-grafana
         image: grafana/grafana:7.5.11
index 2cb67a8..fc12487 100644 (file)
@@ -12,8 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import sys
-import os
 import platform
 import json
 import logging
@@ -21,7 +19,6 @@ from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
 import configparser
 import logging.handlers
 import requests
-import urllib.request as url
 from confluent_kafka import Consumer, KafkaError
 
 # ------------------------------------------------------------------------------
@@ -32,13 +29,17 @@ influxdb = '127.0.0.1'
 
 logger = None
 
+
 def send_to_influxdb(event, pdata):
     url = 'http://{}/write?db=eventsdb'.format(influxdb)
     logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata))
-    r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
-    logger.info('influxdb return code {}'.format(r.status_code))
-    assert (r.status_code == 204), logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
-    
+    try:
+        r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
+        logger.info('influxdb return code {}'.format(r.status_code))
+        assert (r.status_code == 204), logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
+    except Exception as e:
+        logger.error('Exception occured while saving data : '.format(e))
+
 
 def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
     for additionalMeasurements in val:
@@ -333,7 +334,6 @@ def main():
                           }
     config.read(config_file)
 
-
     # ----------------------------------------------------------------------
     # extract the values we want.
     # ----------------------------------------------------------------------
@@ -343,7 +343,7 @@ def main():
 
     influxdb = config.get(config_section, 'influxdb', vars=overrides)
     log_file = config.get(config_section, 'log_file', vars=overrides)
-    kafka_server = config.get(config_section,'kafka_server', vars=overrides)
+    kafka_server = config.get(config_section, 'kafka_server', vars=overrides)
 
     assert (influxdb != ""), "Value of property 'influxdb' is missing in config file"
     assert (log_file != ""), "Value of property 'log_file' is missing in config file"
@@ -384,7 +384,6 @@ def main():
     # kafka Consumer code .
     # ----------------------------------------------------------------------
 
-
     settings = {
         'bootstrap.servers': kafka_server,
         'group.id': 'mygroup',
@@ -396,8 +395,8 @@ def main():
 
     c = Consumer(settings)
 
-    c.subscribe(['measurement','pnfregistration',
-    'fault','thresholdcrossingalert','heartbeat'])
+    c.subscribe(['measurement', 'pnfregistration',
+    'fault', 'thresholdcrossingalert', 'heartbeat'])
 
     try:
         while True:
@@ -408,7 +407,10 @@ def main():
                 assert (msg.value() is not None), "Invalid message"
                 logger.debug('Recived message from topic name {} and offset number {}'.format(msg.topic(), msg.offset()))
                 # saving data in influxdb
-                save_event_in_db(msg.value())
+                try:
+                    save_event_in_db(msg.value())
+                except Exception as e:
+                    logger.error('Exception occured while saving data : '.format(e))
             elif msg.error().code() == KafkaError._PARTITION_EOF:
                 logger.error('End of partition reached {0}/{1}'
                       .format(msg.topic(), msg.partition()))
@@ -421,5 +423,6 @@ def main():
     finally:
         c.close()
 
+
 if __name__ == '__main__':
     main()