Remove redundant code in smo collector 59/7859/2
authorsantanude <santanu.de@xoriant.com>
Wed, 2 Mar 2022 10:12:17 +0000 (15:42 +0530)
committersantanude <santanu.de@xoriant.com>
Thu, 3 Mar 2022 11:32:08 +0000 (17:02 +0530)
SMO-53

Signed-off-by: santanude <santanu.de@xoriant.com>
Change-Id: I6e1dbe7183308488284f85ee7577bd42247a58e1
Signed-off-by: santanude <santanu.de@xoriant.com>
collector/Dockerfile
collector/evel-test-collector/code/collector/monitor.py
collector/evel-test-collector/config/collector.conf
collector/start.sh
docker-compose.yaml

index f2731eb..74c65ab 100755 (executable)
@@ -22,7 +22,7 @@ FROM ubuntu:focal
 
 RUN apt-get update && apt-get -y upgrade
 RUN apt-get install -y git curl python3 python3-pip
-RUN pip3 install requests jsonschema elasticsearch kafka-python gevent
+RUN pip3 install requests jsonschema kafka-python gevent
 
 
 RUN mkdir -p /opt/smo/certs
index 62561e6..9daf78c 100755 (executable)
@@ -34,7 +34,6 @@ import json
 import jsonschema
 from functools import partial
 from datetime import timezone
-from elasticsearch import Elasticsearch
 from kafka import KafkaProducer
 from json import dumps
 import datetime
@@ -212,12 +211,6 @@ def listener(environ, start_response, schema):
             # saving data in Kafka by deafult
             save_event_in_kafka(body)
 
-            logger.info("data_storage ={}".format(data_storage))
-            data_storageArr = data_storage.split("|")
-            # if word 'elasticsearch' exists in config file then save data in elasticsearch
-            if('elasticsearch' in data_storageArr):
-                save_event_in_elasticsearch(body)
-
         except jsonschema.SchemaError as e:
             logger.error('Schema is not valid! {0}'.format(e))
             # Stop container forcefully.
@@ -281,112 +274,6 @@ def produce_events_in_kafka(jobj, topic):
     except Exception as e:
         logger.error('Getting error while posting event into kafka bus {0}'.format(e))
 
-# --------------------------------------------------------------------------
-# Save event data in Elasticsearch
-# --------------------------------------------------------------------------
-
-
-def format_timestamp(EpochMicrosec):
-    if isinstance(EpochMicrosec, int):
-        return datetime.datetime.fromtimestamp(int(str(EpochMicrosec)[:10]), tz=timezone.utc)
-    else:
-        return datetime.datetime.fromtimestamp(EpochMicrosec, tz=timezone.utc)
-
-
-def save_event_in_elasticsearch(body):
-    jobj = json.loads(body)
-    eventId = jobj['event']['commonEventHeader']['eventId']
-    sourceId = jobj['event']['commonEventHeader']['sourceId']
-    startEpochMicrosec = jobj['event']['commonEventHeader']['startEpochMicrosec']
-    lastEpochMicrosec = jobj['event']['commonEventHeader']['lastEpochMicrosec']
-    jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] = format_timestamp(startEpochMicrosec)
-    jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] = format_timestamp(lastEpochMicrosec)
-    domain = jobj['event']['commonEventHeader']['domain'].lower()
-    es = Elasticsearch([{'host': elasticsearch_domain, 'port': elasticsearch_port}])
-
-    if 'measurementFields' in jobj['event']:
-        if 'commonEventHeader' in jobj['event']:
-            es.index(index='measurement', body=jobj['event']['commonEventHeader'])
-
-        if 'additionalMeasurements' in jobj['event']['measurementFields']:
-            for addmeasure in jobj['event']['measurementFields']['additionalMeasurements']:
-                addmeasure['eventId'] = eventId
-                addmeasure['sourceId'] = sourceId
-                addmeasure['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
-                addmeasure['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
-                addmeasure['startEpochMicrosec'] = startEpochMicrosec
-                addmeasure['lastEpochMicrosec'] = lastEpochMicrosec
-                es.index(index='measurementaddlmeasurements', body=addmeasure)
-
-        if 'cpuUsageArray' in jobj['event']['measurementFields']:
-            for cpu in jobj['event']['measurementFields']['cpuUsageArray']:
-                cpu['eventId'] = eventId
-                cpu['sourceId'] = sourceId
-                cpu['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
-                cpu['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
-                cpu['startEpochMicrosec'] = startEpochMicrosec
-                cpu['lastEpochMicrosec'] = lastEpochMicrosec
-                es.index(index='measurementcpuusage', body=cpu)
-
-        if 'diskUsageArray' in jobj['event']['measurementFields']:
-            for disk in jobj['event']['measurementFields']['diskUsageArray']:
-                disk['eventId'] = eventId
-                disk['sourceId'] = sourceId
-                disk['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
-                disk['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
-                disk['startEpochMicrosec'] = startEpochMicrosec
-                disk['lastEpochMicrosec'] = lastEpochMicrosec
-                es.index(index='measurementdiskusage', body=disk)
-
-        if 'nicPerformanceArray' in jobj['event']['measurementFields']:
-            for vnic in jobj['event']['measurementFields']['nicPerformanceArray']:
-                vnic['eventId'] = eventId
-                vnic['sourceId'] = sourceId
-                vnic['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
-                vnic['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
-                vnic['startEpochMicrosec'] = startEpochMicrosec
-                vnic['lastEpochMicrosec'] = lastEpochMicrosec
-                es.index(index='measurementnicperformance', body=vnic)
-
-        if 'memoryUsageArray' in jobj['event']['measurementFields']:
-            for memory in jobj['event']['measurementFields']['memoryUsageArray']:
-                memory['eventId'] = eventId
-                memory['sourceId'] = sourceId
-                memory['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
-                memory['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
-                memory['startEpochMicrosec'] = startEpochMicrosec
-                memory['lastEpochMicrosec'] = lastEpochMicrosec
-                es.index(index='measurementmemoryusage', body=memory)
-
-        if 'loadArray' in jobj['event']['measurementFields']:
-            for load in jobj['event']['measurementFields']['loadArray']:
-                load['eventId'] = eventId
-                load['sourceId'] = sourceId
-                load['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
-                load['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
-                load['startEpochMicrosec'] = startEpochMicrosec
-                load['lastEpochMicrosec'] = lastEpochMicrosec
-                es.index(index='measurementload', body=load)
-
-        if 'networkSliceArray' in jobj['event']['measurementFields']:
-            for networkslice in jobj['event']['measurementFields']['networkSliceArray']:
-                networkslice['eventId'] = eventId
-                networkslice['sourceId'] = sourceId
-                networkslice['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
-                networkslice['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
-                networkslice['startEpochMicrosec'] = startEpochMicrosec
-                networkslice['lastEpochMicrosec'] = lastEpochMicrosec
-                es.index(index='measurementnetworkslice', body=networkslice)
-
-    if 'pnfRegistrationFields' in jobj['event']:
-        es.index(index=domain, body=jobj['event'])
-    if 'thresholdCrossingAlertFields' in jobj['event']:
-        es.index(index=domain, body=jobj['event'])
-    if 'faultFields' in jobj['event']:
-        es.index(index=domain, body=jobj['event'])
-    if 'heartbeatFields' in jobj['event']:
-        es.index(index=domain, body=jobj['event'])
-
 
 def test_listener(environ, start_response, schema):
     '''
@@ -576,9 +463,6 @@ USAGE
         global vel_username
         global vel_password
         global vel_topic_name
-        global data_storage
-        global elasticsearch_domain
-        global elasticsearch_port
         global kafka_server
         global kafka_topic
 
@@ -591,15 +475,6 @@ USAGE
         kafka_topic = config.get(config_section,
                                          'kafka_topic',
                                          vars=overrides)
-        data_storage = config.get(config_section,
-                                  'data_storage',
-                                  vars=overrides)
-        elasticsearch_domain = config.get(config_section,
-                                          'elasticsearch_domain',
-                                          vars=overrides)
-        elasticsearch_port = config.get(config_section,
-                                        'elasticsearch_port',
-                                        vars=overrides)
         vel_topic_name = config.get(config_section,
                                     'vel_topic_name',
                                     vars=overrides)
index 4d8078a..c5dbe21 100755 (executable)
@@ -43,9 +43,6 @@ vel_port = 9999
 vel_path =
 vel_username =
 vel_password =
-data_storage =
-elasticsearch_domain =
-elasticsearch_port= 9200
 vel_topic_name = events
 kafka_server =
 kafka_topic =
index 9e1c6e2..2e8da25 100755 (executable)
@@ -46,10 +46,6 @@ sed -i -- "s/vel_password =/vel_password = $collector_pass/g" \
   $config_file
 sed -i -- "s~vel_path = vendor_event_listener/~vel_path = $collector_path~g" \
   $config_file
-sed -i -- "s/elasticsearch_domain =/elasticsearch_domain = $elasticsearch_domain/g" \
-  $config_file
-sed -i -- "s/data_storage =/data_storage = $data_storage/g" \
-  $config_file
 sed -i -- "s/kafka_server =/kafka_server = $smo_kafka_host:$smo_kafka_port/g" \
   $config_file
 sed -i -- "s/kafka_topic =/kafka_topic = $smo_kafka_topic/g" \
index 8471ce5..2863090 100644 (file)
@@ -126,11 +126,9 @@ services:
          volumes:
              - ~/ves-certificate:/opt/smo/certs
          environment:
-                 elasticsearch_domain: "smo-elasticsearch"
                  smo_kafka_host: "smo-kafka"
                  smo_kafka_port: "29092"
                  smo_kafka_topic: "smo-events"
-                 data_storage: "elasticsearch"
                  collector_host: "smo-collector"
                  collector_port: "9999"
                  collector_user: "user"
@@ -180,18 +178,6 @@ services:
           depends_on:
              - agent-kafka
              - smo-collector
-  smo-elasticsearch:
-           image: docker.elastic.co/elasticsearch/elasticsearch:7.11.1
-           container_name: smo-elasticsearch
-           restart: always
-           environment:
-                  discovery.type: "single-node"
-                  ES_JAVA_OPTS: "-Xms1024m -Xmx1024m"
-           ports:
-                   - 9200:9200
-                   - 9300:9300
-           networks:
-                   - smo-net
   smo-post-config:
          container_name: smo-post-config
          build: ./postconfig