import jsonschema
from functools import partial
from datetime import timezone
-from elasticsearch import Elasticsearch
from kafka import KafkaProducer
from json import dumps
import datetime
# saving data in Kafka by deafult
save_event_in_kafka(body)
- logger.info("data_storage ={}".format(data_storage))
- data_storageArr = data_storage.split("|")
- # if word 'elasticsearch' exists in config file then save data in elasticsearch
- if('elasticsearch' in data_storageArr):
- save_event_in_elasticsearch(body)
-
except jsonschema.SchemaError as e:
logger.error('Schema is not valid! {0}'.format(e))
# Stop container forcefully.
except Exception as e:
logger.error('Getting error while posting event into kafka bus {0}'.format(e))
-# --------------------------------------------------------------------------
-# Save event data in Elasticsearch
-# --------------------------------------------------------------------------
-
-
-def format_timestamp(EpochMicrosec):
- if isinstance(EpochMicrosec, int):
- return datetime.datetime.fromtimestamp(int(str(EpochMicrosec)[:10]), tz=timezone.utc)
- else:
- return datetime.datetime.fromtimestamp(EpochMicrosec, tz=timezone.utc)
-
-
-def save_event_in_elasticsearch(body):
- jobj = json.loads(body)
- eventId = jobj['event']['commonEventHeader']['eventId']
- sourceId = jobj['event']['commonEventHeader']['sourceId']
- startEpochMicrosec = jobj['event']['commonEventHeader']['startEpochMicrosec']
- lastEpochMicrosec = jobj['event']['commonEventHeader']['lastEpochMicrosec']
- jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] = format_timestamp(startEpochMicrosec)
- jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] = format_timestamp(lastEpochMicrosec)
- domain = jobj['event']['commonEventHeader']['domain'].lower()
- es = Elasticsearch([{'host': elasticsearch_domain, 'port': elasticsearch_port}])
-
- if 'measurementFields' in jobj['event']:
- if 'commonEventHeader' in jobj['event']:
- es.index(index='measurement', body=jobj['event']['commonEventHeader'])
-
- if 'additionalMeasurements' in jobj['event']['measurementFields']:
- for addmeasure in jobj['event']['measurementFields']['additionalMeasurements']:
- addmeasure['eventId'] = eventId
- addmeasure['sourceId'] = sourceId
- addmeasure['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
- addmeasure['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
- addmeasure['startEpochMicrosec'] = startEpochMicrosec
- addmeasure['lastEpochMicrosec'] = lastEpochMicrosec
- es.index(index='measurementaddlmeasurements', body=addmeasure)
-
- if 'cpuUsageArray' in jobj['event']['measurementFields']:
- for cpu in jobj['event']['measurementFields']['cpuUsageArray']:
- cpu['eventId'] = eventId
- cpu['sourceId'] = sourceId
- cpu['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
- cpu['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
- cpu['startEpochMicrosec'] = startEpochMicrosec
- cpu['lastEpochMicrosec'] = lastEpochMicrosec
- es.index(index='measurementcpuusage', body=cpu)
-
- if 'diskUsageArray' in jobj['event']['measurementFields']:
- for disk in jobj['event']['measurementFields']['diskUsageArray']:
- disk['eventId'] = eventId
- disk['sourceId'] = sourceId
- disk['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
- disk['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
- disk['startEpochMicrosec'] = startEpochMicrosec
- disk['lastEpochMicrosec'] = lastEpochMicrosec
- es.index(index='measurementdiskusage', body=disk)
-
- if 'nicPerformanceArray' in jobj['event']['measurementFields']:
- for vnic in jobj['event']['measurementFields']['nicPerformanceArray']:
- vnic['eventId'] = eventId
- vnic['sourceId'] = sourceId
- vnic['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
- vnic['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
- vnic['startEpochMicrosec'] = startEpochMicrosec
- vnic['lastEpochMicrosec'] = lastEpochMicrosec
- es.index(index='measurementnicperformance', body=vnic)
-
- if 'memoryUsageArray' in jobj['event']['measurementFields']:
- for memory in jobj['event']['measurementFields']['memoryUsageArray']:
- memory['eventId'] = eventId
- memory['sourceId'] = sourceId
- memory['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
- memory['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
- memory['startEpochMicrosec'] = startEpochMicrosec
- memory['lastEpochMicrosec'] = lastEpochMicrosec
- es.index(index='measurementmemoryusage', body=memory)
-
- if 'loadArray' in jobj['event']['measurementFields']:
- for load in jobj['event']['measurementFields']['loadArray']:
- load['eventId'] = eventId
- load['sourceId'] = sourceId
- load['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
- load['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
- load['startEpochMicrosec'] = startEpochMicrosec
- load['lastEpochMicrosec'] = lastEpochMicrosec
- es.index(index='measurementload', body=load)
-
- if 'networkSliceArray' in jobj['event']['measurementFields']:
- for networkslice in jobj['event']['measurementFields']['networkSliceArray']:
- networkslice['eventId'] = eventId
- networkslice['sourceId'] = sourceId
- networkslice['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
- networkslice['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
- networkslice['startEpochMicrosec'] = startEpochMicrosec
- networkslice['lastEpochMicrosec'] = lastEpochMicrosec
- es.index(index='measurementnetworkslice', body=networkslice)
-
- if 'pnfRegistrationFields' in jobj['event']:
- es.index(index=domain, body=jobj['event'])
- if 'thresholdCrossingAlertFields' in jobj['event']:
- es.index(index=domain, body=jobj['event'])
- if 'faultFields' in jobj['event']:
- es.index(index=domain, body=jobj['event'])
- if 'heartbeatFields' in jobj['event']:
- es.index(index=domain, body=jobj['event'])
-
def test_listener(environ, start_response, schema):
'''
global vel_username
global vel_password
global vel_topic_name
- global data_storage
- global elasticsearch_domain
- global elasticsearch_port
global kafka_server
global kafka_topic
kafka_topic = config.get(config_section,
'kafka_topic',
vars=overrides)
- data_storage = config.get(config_section,
- 'data_storage',
- vars=overrides)
- elasticsearch_domain = config.get(config_section,
- 'elasticsearch_domain',
- vars=overrides)
- elasticsearch_port = config.get(config_section,
- 'elasticsearch_port',
- vars=overrides)
vel_topic_name = config.get(config_section,
'vel_topic_name',
vars=overrides)