From c83ea420b135a8fddd293c095c566c81ec46328c Mon Sep 17 00:00:00 2001 From: santanude Date: Wed, 2 Mar 2022 15:42:17 +0530 Subject: [PATCH] Remove redundant code in smo collector SMO-53 Signed-off-by: santanude Change-Id: I6e1dbe7183308488284f85ee7577bd42247a58e1 Signed-off-by: santanude --- collector/Dockerfile | 2 +- .../evel-test-collector/code/collector/monitor.py | 125 --------------------- .../evel-test-collector/config/collector.conf | 3 - collector/start.sh | 4 - docker-compose.yaml | 14 --- 5 files changed, 1 insertion(+), 147 deletions(-) diff --git a/collector/Dockerfile b/collector/Dockerfile index f2731eb..74c65ab 100755 --- a/collector/Dockerfile +++ b/collector/Dockerfile @@ -22,7 +22,7 @@ FROM ubuntu:focal RUN apt-get update && apt-get -y upgrade RUN apt-get install -y git curl python3 python3-pip -RUN pip3 install requests jsonschema elasticsearch kafka-python gevent +RUN pip3 install requests jsonschema kafka-python gevent RUN mkdir -p /opt/smo/certs diff --git a/collector/evel-test-collector/code/collector/monitor.py b/collector/evel-test-collector/code/collector/monitor.py index 62561e6..9daf78c 100755 --- a/collector/evel-test-collector/code/collector/monitor.py +++ b/collector/evel-test-collector/code/collector/monitor.py @@ -34,7 +34,6 @@ import json import jsonschema from functools import partial from datetime import timezone -from elasticsearch import Elasticsearch from kafka import KafkaProducer from json import dumps import datetime @@ -212,12 +211,6 @@ def listener(environ, start_response, schema): # saving data in Kafka by deafult save_event_in_kafka(body) - logger.info("data_storage ={}".format(data_storage)) - data_storageArr = data_storage.split("|") - # if word 'elasticsearch' exists in config file then save data in elasticsearch - if('elasticsearch' in data_storageArr): - save_event_in_elasticsearch(body) - except jsonschema.SchemaError as e: logger.error('Schema is not valid! {0}'.format(e)) # Stop container forcefully. @@ -281,112 +274,6 @@ def produce_events_in_kafka(jobj, topic): except Exception as e: logger.error('Getting error while posting event into kafka bus {0}'.format(e)) -# -------------------------------------------------------------------------- -# Save event data in Elasticsearch -# -------------------------------------------------------------------------- - - -def format_timestamp(EpochMicrosec): - if isinstance(EpochMicrosec, int): - return datetime.datetime.fromtimestamp(int(str(EpochMicrosec)[:10]), tz=timezone.utc) - else: - return datetime.datetime.fromtimestamp(EpochMicrosec, tz=timezone.utc) - - -def save_event_in_elasticsearch(body): - jobj = json.loads(body) - eventId = jobj['event']['commonEventHeader']['eventId'] - sourceId = jobj['event']['commonEventHeader']['sourceId'] - startEpochMicrosec = jobj['event']['commonEventHeader']['startEpochMicrosec'] - lastEpochMicrosec = jobj['event']['commonEventHeader']['lastEpochMicrosec'] - jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] = format_timestamp(startEpochMicrosec) - jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] = format_timestamp(lastEpochMicrosec) - domain = jobj['event']['commonEventHeader']['domain'].lower() - es = Elasticsearch([{'host': elasticsearch_domain, 'port': elasticsearch_port}]) - - if 'measurementFields' in jobj['event']: - if 'commonEventHeader' in jobj['event']: - es.index(index='measurement', body=jobj['event']['commonEventHeader']) - - if 'additionalMeasurements' in jobj['event']['measurementFields']: - for addmeasure in jobj['event']['measurementFields']['additionalMeasurements']: - addmeasure['eventId'] = eventId - addmeasure['sourceId'] = sourceId - addmeasure['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] - addmeasure['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] - addmeasure['startEpochMicrosec'] = startEpochMicrosec - addmeasure['lastEpochMicrosec'] = lastEpochMicrosec - es.index(index='measurementaddlmeasurements', body=addmeasure) - - if 'cpuUsageArray' in jobj['event']['measurementFields']: - for cpu in jobj['event']['measurementFields']['cpuUsageArray']: - cpu['eventId'] = eventId - cpu['sourceId'] = sourceId - cpu['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] - cpu['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] - cpu['startEpochMicrosec'] = startEpochMicrosec - cpu['lastEpochMicrosec'] = lastEpochMicrosec - es.index(index='measurementcpuusage', body=cpu) - - if 'diskUsageArray' in jobj['event']['measurementFields']: - for disk in jobj['event']['measurementFields']['diskUsageArray']: - disk['eventId'] = eventId - disk['sourceId'] = sourceId - disk['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] - disk['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] - disk['startEpochMicrosec'] = startEpochMicrosec - disk['lastEpochMicrosec'] = lastEpochMicrosec - es.index(index='measurementdiskusage', body=disk) - - if 'nicPerformanceArray' in jobj['event']['measurementFields']: - for vnic in jobj['event']['measurementFields']['nicPerformanceArray']: - vnic['eventId'] = eventId - vnic['sourceId'] = sourceId - vnic['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] - vnic['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] - vnic['startEpochMicrosec'] = startEpochMicrosec - vnic['lastEpochMicrosec'] = lastEpochMicrosec - es.index(index='measurementnicperformance', body=vnic) - - if 'memoryUsageArray' in jobj['event']['measurementFields']: - for memory in jobj['event']['measurementFields']['memoryUsageArray']: - memory['eventId'] = eventId - memory['sourceId'] = sourceId - memory['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] - memory['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] - memory['startEpochMicrosec'] = startEpochMicrosec - memory['lastEpochMicrosec'] = lastEpochMicrosec - es.index(index='measurementmemoryusage', body=memory) - - if 'loadArray' in jobj['event']['measurementFields']: - for load in jobj['event']['measurementFields']['loadArray']: - load['eventId'] = eventId - load['sourceId'] = sourceId - load['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] - load['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] - load['startEpochMicrosec'] = startEpochMicrosec - load['lastEpochMicrosec'] = lastEpochMicrosec - es.index(index='measurementload', body=load) - - if 'networkSliceArray' in jobj['event']['measurementFields']: - for networkslice in jobj['event']['measurementFields']['networkSliceArray']: - networkslice['eventId'] = eventId - networkslice['sourceId'] = sourceId - networkslice['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] - networkslice['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] - networkslice['startEpochMicrosec'] = startEpochMicrosec - networkslice['lastEpochMicrosec'] = lastEpochMicrosec - es.index(index='measurementnetworkslice', body=networkslice) - - if 'pnfRegistrationFields' in jobj['event']: - es.index(index=domain, body=jobj['event']) - if 'thresholdCrossingAlertFields' in jobj['event']: - es.index(index=domain, body=jobj['event']) - if 'faultFields' in jobj['event']: - es.index(index=domain, body=jobj['event']) - if 'heartbeatFields' in jobj['event']: - es.index(index=domain, body=jobj['event']) - def test_listener(environ, start_response, schema): ''' @@ -576,9 +463,6 @@ USAGE global vel_username global vel_password global vel_topic_name - global data_storage - global elasticsearch_domain - global elasticsearch_port global kafka_server global kafka_topic @@ -591,15 +475,6 @@ USAGE kafka_topic = config.get(config_section, 'kafka_topic', vars=overrides) - data_storage = config.get(config_section, - 'data_storage', - vars=overrides) - elasticsearch_domain = config.get(config_section, - 'elasticsearch_domain', - vars=overrides) - elasticsearch_port = config.get(config_section, - 'elasticsearch_port', - vars=overrides) vel_topic_name = config.get(config_section, 'vel_topic_name', vars=overrides) diff --git a/collector/evel-test-collector/config/collector.conf b/collector/evel-test-collector/config/collector.conf index 4d8078a..c5dbe21 100755 --- a/collector/evel-test-collector/config/collector.conf +++ b/collector/evel-test-collector/config/collector.conf @@ -43,9 +43,6 @@ vel_port = 9999 vel_path = vel_username = vel_password = -data_storage = -elasticsearch_domain = -elasticsearch_port= 9200 vel_topic_name = events kafka_server = kafka_topic = diff --git a/collector/start.sh b/collector/start.sh index 9e1c6e2..2e8da25 100755 --- a/collector/start.sh +++ b/collector/start.sh @@ -46,10 +46,6 @@ sed -i -- "s/vel_password =/vel_password = $collector_pass/g" \ $config_file sed -i -- "s~vel_path = vendor_event_listener/~vel_path = $collector_path~g" \ $config_file -sed -i -- "s/elasticsearch_domain =/elasticsearch_domain = $elasticsearch_domain/g" \ - $config_file -sed -i -- "s/data_storage =/data_storage = $data_storage/g" \ - $config_file sed -i -- "s/kafka_server =/kafka_server = $smo_kafka_host:$smo_kafka_port/g" \ $config_file sed -i -- "s/kafka_topic =/kafka_topic = $smo_kafka_topic/g" \ diff --git a/docker-compose.yaml b/docker-compose.yaml index 8471ce5..2863090 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -126,11 +126,9 @@ services: volumes: - ~/ves-certificate:/opt/smo/certs environment: - elasticsearch_domain: "smo-elasticsearch" smo_kafka_host: "smo-kafka" smo_kafka_port: "29092" smo_kafka_topic: "smo-events" - data_storage: "elasticsearch" collector_host: "smo-collector" collector_port: "9999" collector_user: "user" @@ -180,18 +178,6 @@ services: depends_on: - agent-kafka - smo-collector - smo-elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:7.11.1 - container_name: smo-elasticsearch - restart: always - environment: - discovery.type: "single-node" - ES_JAVA_OPTS: "-Xms1024m -Xmx1024m" - ports: - - 9200:9200 - - 9300:9300 - networks: - - smo-net smo-post-config: container_name: smo-post-config build: ./postconfig -- 2.16.6